|
@@ -92,6 +92,9 @@
|
|
|
-export([push/3]).
|
|
|
-export([push/4]).
|
|
|
|
|
|
+%% Stream handlers.
|
|
|
+-export([cast/2]).
|
|
|
+
|
|
|
%% Internal.
|
|
|
-export([response_headers/2]).
|
|
|
|
|
@@ -516,12 +519,12 @@ read_body(Req=#{has_body := false}, _) ->
|
|
|
{ok, <<>>, Req};
|
|
|
read_body(Req=#{has_read_body := true}, _) ->
|
|
|
{ok, <<>>, Req};
|
|
|
-read_body(Req=#{pid := Pid, streamid := StreamID}, Opts) ->
|
|
|
+read_body(Req, Opts) ->
|
|
|
Length = maps:get(length, Opts, 8000000),
|
|
|
Period = maps:get(period, Opts, 15000),
|
|
|
Timeout = maps:get(timeout, Opts, Period + 1000),
|
|
|
Ref = make_ref(),
|
|
|
- Pid ! {{Pid, StreamID}, {read_body, self(), Ref, Length, Period}},
|
|
|
+ cast({read_body, self(), Ref, Length, Period}, Req),
|
|
|
receive
|
|
|
{request_body, Ref, nofin, Body} ->
|
|
|
{more, Body, Req};
|
|
@@ -775,10 +778,8 @@ inform(Status, Req) ->
|
|
|
-spec inform(cowboy:http_status(), cowboy:http_headers(), req()) -> ok.
|
|
|
inform(_, _, #{has_sent_resp := _}) ->
|
|
|
error(function_clause); %% @todo Better error message.
|
|
|
-inform(Status, Headers, #{pid := Pid, streamid := StreamID})
|
|
|
- when is_integer(Status); is_binary(Status) ->
|
|
|
- Pid ! {{Pid, StreamID}, {inform, Status, Headers}},
|
|
|
- ok.
|
|
|
+inform(Status, Headers, Req) when is_integer(Status); is_binary(Status) ->
|
|
|
+ cast({inform, Status, Headers}, Req).
|
|
|
|
|
|
-spec reply(cowboy:http_status(), Req) -> Req when Req::req().
|
|
|
reply(Status, Req) ->
|
|
@@ -823,11 +824,11 @@ reply(Status, Headers, Body, Req)
|
|
|
%% Don't send any body for HEAD responses. While the protocol code is
|
|
|
%% supposed to enforce this rule, we prefer to avoid copying too much
|
|
|
%% data around if we can avoid it.
|
|
|
-do_reply(Status, Headers, _, Req=#{pid := Pid, streamid := StreamID, method := <<"HEAD">>}) ->
|
|
|
- Pid ! {{Pid, StreamID}, {response, Status, response_headers(Headers, Req), <<>>}},
|
|
|
+do_reply(Status, Headers, _, Req=#{method := <<"HEAD">>}) ->
|
|
|
+ cast({response, Status, response_headers(Headers, Req), <<>>}, Req),
|
|
|
done_replying(Req, true);
|
|
|
-do_reply(Status, Headers, Body, Req=#{pid := Pid, streamid := StreamID}) ->
|
|
|
- Pid ! {{Pid, StreamID}, {response, Status, response_headers(Headers, Req), Body}},
|
|
|
+do_reply(Status, Headers, Body, Req) ->
|
|
|
+ cast({response, Status, response_headers(Headers, Req), Body}, Req),
|
|
|
done_replying(Req, true).
|
|
|
|
|
|
done_replying(Req, HasSentResp) ->
|
|
@@ -841,9 +842,8 @@ stream_reply(Status, Req) ->
|
|
|
-> Req when Req::req().
|
|
|
stream_reply(_, _, #{has_sent_resp := _}) ->
|
|
|
error(function_clause);
|
|
|
-stream_reply(Status, Headers=#{}, Req=#{pid := Pid, streamid := StreamID})
|
|
|
- when is_integer(Status); is_binary(Status) ->
|
|
|
- Pid ! {{Pid, StreamID}, {headers, Status, response_headers(Headers, Req)}},
|
|
|
+stream_reply(Status, Headers=#{}, Req) when is_integer(Status); is_binary(Status) ->
|
|
|
+ cast({headers, Status, response_headers(Headers, Req)}, Req),
|
|
|
done_replying(Req, headers).
|
|
|
|
|
|
-spec stream_body(resp_body(), fin | nofin, req()) -> ok.
|
|
@@ -872,8 +872,8 @@ stream_body(Data, IsFin, Req=#{has_sent_resp := headers})
|
|
|
stream_body({data, self(), IsFin, Data}, Req).
|
|
|
|
|
|
%% @todo Do we need a timeout?
|
|
|
-stream_body(Msg, #{pid := Pid, streamid := StreamID}) ->
|
|
|
- Pid ! {{Pid, StreamID}, Msg},
|
|
|
+stream_body(Msg, Req=#{pid := Pid}) ->
|
|
|
+ cast(Msg, Req),
|
|
|
receive {data_ack, Pid} -> ok end.
|
|
|
|
|
|
-spec stream_events(cow_sse:event() | [cow_sse:event()], fin | nofin, req()) -> ok.
|
|
@@ -883,9 +883,8 @@ stream_events(Events, IsFin, Req=#{has_sent_resp := headers}) ->
|
|
|
stream_body({data, self(), IsFin, cow_sse:events(Events)}, Req).
|
|
|
|
|
|
-spec stream_trailers(cowboy:http_headers(), req()) -> ok.
|
|
|
-stream_trailers(Trailers, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
|
|
|
- Pid ! {{Pid, StreamID}, {trailers, Trailers}},
|
|
|
- ok.
|
|
|
+stream_trailers(Trailers, Req=#{has_sent_resp := headers}) ->
|
|
|
+ cast({trailers, Trailers}, Req).
|
|
|
|
|
|
-spec push(iodata(), cowboy:http_headers(), req()) -> ok.
|
|
|
push(Path, Headers, Req) ->
|
|
@@ -895,14 +894,19 @@ push(Path, Headers, Req) ->
|
|
|
%% @todo Path, Headers, Opts, everything should be in proper binary,
|
|
|
%% or normalized when creating the Req object.
|
|
|
-spec push(iodata(), cowboy:http_headers(), req(), push_opts()) -> ok.
|
|
|
-push(Path, Headers, #{pid := Pid, streamid := StreamID,
|
|
|
- scheme := Scheme0, host := Host0, port := Port0}, Opts) ->
|
|
|
+push(Path, Headers, Req=#{scheme := Scheme0, host := Host0, port := Port0}, Opts) ->
|
|
|
Method = maps:get(method, Opts, <<"GET">>),
|
|
|
Scheme = maps:get(scheme, Opts, Scheme0),
|
|
|
Host = maps:get(host, Opts, Host0),
|
|
|
Port = maps:get(port, Opts, Port0),
|
|
|
Qs = maps:get(qs, Opts, <<>>),
|
|
|
- Pid ! {{Pid, StreamID}, {push, Method, Scheme, Host, Port, Path, Qs, Headers}},
|
|
|
+ cast({push, Method, Scheme, Host, Port, Path, Qs, Headers}, Req).
|
|
|
+
|
|
|
+%% Stream handlers.
|
|
|
+
|
|
|
+-spec cast(any(), req()) -> ok.
|
|
|
+cast(Msg, #{pid := Pid, streamid := StreamID}) ->
|
|
|
+ Pid ! {{Pid, StreamID}, Msg},
|
|
|
ok.
|
|
|
|
|
|
%% Internal.
|