epgsql_wire.erl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. %%% @doc
  2. %%% Interface to encoder/decoder for postgresql
  3. %%% <a href="https://www.postgresql.org/docs/current/protocol-message-formats.html">wire-protocol</a>
  4. %%%
  5. %%% See also `include/protocol.hrl'.
  6. %%% @end
  7. %%% Copyright (C) 2009 - Will Glozer. All rights reserved.
  8. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  9. -module(epgsql_wire).
  10. -export([decode_message/1,
  11. decode_error/1,
  12. decode_strings/1,
  13. decode_columns/3,
  14. decode_parameters/2,
  15. encode_command/1,
  16. encode_command/2,
  17. build_decoder/2,
  18. decode_data/2,
  19. decode_complete/1,
  20. encode_types/2,
  21. encode_formats/1,
  22. format/2,
  23. encode_parameters/2,
  24. encode_standby_status_update/3]).
  25. -export_type([row_decoder/0]).
  26. -include("epgsql.hrl").
  27. -include("protocol.hrl").
  28. -opaque row_decoder() :: {[epgsql_binary:decoder()], [epgsql:column()], epgsql_binary:codec()}.
  29. %% @doc tries to extract single postgresql packet from TCP stream
  30. -spec decode_message(binary()) -> {byte(), binary(), binary()} | binary().
  31. decode_message(<<Type:8, Len:?int32, Rest/binary>> = Bin) ->
  32. Len2 = Len - 4,
  33. case Rest of
  34. <<Data:Len2/binary, Tail/binary>> ->
  35. {Type, Data, Tail};
  36. _Other ->
  37. Bin
  38. end;
  39. decode_message(Bin) ->
  40. Bin.
  41. %% @doc decode a single null-terminated string
  42. -spec decode_string(binary()) -> [binary(), ...].
  43. decode_string(Bin) ->
  44. binary:split(Bin, <<0>>).
  45. %% @doc decode multiple null-terminated string
  46. -spec decode_strings(binary()) -> [binary(), ...].
  47. decode_strings(Bin) ->
  48. %% Assert the last byte is what we want it to be
  49. %% Remove that byte from the Binary, so the zero
  50. %% terminators are separators. Then apply
  51. %% binary:split/3 directly on the remaining Subj
  52. Sz = byte_size(Bin) - 1,
  53. <<Subj:Sz/binary, 0>> = Bin,
  54. binary:split(Subj, <<0>>, [global]).
  55. %% @doc decode error's field
  56. -spec decode_fields(binary()) -> [{byte(), binary()}].
  57. decode_fields(Bin) ->
  58. decode_fields(Bin, []).
  59. decode_fields(<<0>>, Acc) ->
  60. Acc;
  61. decode_fields(<<Type:8, Rest/binary>>, Acc) ->
  62. [Str, Rest2] = decode_string(Rest),
  63. decode_fields(Rest2, [{Type, Str} | Acc]).
  64. %% @doc decode ErrorResponse
  65. %% See [http://www.postgresql.org/docs/current/interactive/protocol-error-fields.html]
  66. -spec decode_error(binary()) -> epgsql:query_error().
  67. decode_error(Bin) ->
  68. Fields = decode_fields(Bin),
  69. ErrCode = proplists:get_value($C, Fields),
  70. ErrName = epgsql_errcodes:to_name(ErrCode),
  71. {ErrSeverity, Extra} = case proplists:get_value($V, Fields) of
  72. undefined ->
  73. {proplists:get_value($S, Fields), []};
  74. Severity ->
  75. {Severity, [{severity, proplists:get_value($S, Fields)}]}
  76. end,
  77. Error = #error{
  78. severity = lower_atom(ErrSeverity),
  79. code = ErrCode,
  80. codename = ErrName,
  81. message = proplists:get_value($M, Fields),
  82. extra = lists:sort(Extra ++ lists:foldl(fun decode_error_extra/2, [], Fields))},
  83. Error.
  84. %% consider updating #error.extra typespec when changing/adding extras
  85. decode_error_extra({$D, Val}, Acc) ->
  86. [{detail, Val} | Acc];
  87. decode_error_extra({$H, Val}, Acc) ->
  88. [{hint, Val} | Acc];
  89. decode_error_extra({$P, Val}, Acc) ->
  90. [{position, Val} | Acc];
  91. decode_error_extra({$p, Val}, Acc) ->
  92. [{internal_position, Val} | Acc];
  93. decode_error_extra({$q, Val}, Acc) ->
  94. [{internal_query, Val} | Acc];
  95. decode_error_extra({$W, Val}, Acc) ->
  96. [{where, Val} | Acc];
  97. decode_error_extra({$s, Val}, Acc) ->
  98. [{schema_name, Val} | Acc];
  99. decode_error_extra({$t, Val}, Acc) ->
  100. [{table_name, Val} | Acc];
  101. decode_error_extra({$c, Val}, Acc) ->
  102. [{column_name, Val} | Acc];
  103. decode_error_extra({$d, Val}, Acc) ->
  104. [{data_type_name, Val} | Acc];
  105. decode_error_extra({$n, Val}, Acc) ->
  106. [{constraint_name, Val} | Acc];
  107. decode_error_extra({$F, Val}, Acc) ->
  108. [{file, Val} | Acc];
  109. decode_error_extra({$L, Val}, Acc) ->
  110. [{line, Val} | Acc];
  111. decode_error_extra({$R, Val}, Acc) ->
  112. [{routine, Val} | Acc];
  113. decode_error_extra({_, _}, Acc) ->
  114. Acc.
  115. lower_atom(Str) when is_binary(Str) ->
  116. lower_atom(binary_to_list(Str));
  117. lower_atom(Str) when is_list(Str) ->
  118. list_to_atom(string:to_lower(Str)).
  119. %% @doc Build decoder for DataRow
  120. -spec build_decoder([epgsql:column()], epgsql_binary:codec()) -> row_decoder().
  121. build_decoder(Columns, Codec) ->
  122. Decoders = lists:map(
  123. fun(#column{oid = Oid, format = Format}) ->
  124. Fmt = case Format of
  125. 1 -> binary;
  126. 0 -> text
  127. end,
  128. epgsql_binary:oid_to_decoder(Oid, Fmt, Codec)
  129. end, Columns),
  130. {Decoders, Columns, Codec}.
  131. %% @doc decode row data
  132. -spec decode_data(binary(), row_decoder()) -> tuple().
  133. decode_data(Bin, {Decoders, _Columns, Codec}) ->
  134. list_to_tuple(decode_data(Bin, Decoders, Codec)).
  135. decode_data(_, [], _) -> [];
  136. decode_data(<<-1:?int32, Rest/binary>>, [_Dec | Decs], Codec) ->
  137. [epgsql_binary:null(Codec) | decode_data(Rest, Decs, Codec)];
  138. decode_data(<<Len:?int32, Value:Len/binary, Rest/binary>>, [Decoder | Decs], Codec) ->
  139. [epgsql_binary:decode(Value, Decoder)
  140. | decode_data(Rest, Decs, Codec)].
  141. %% @doc decode RowDescription column information
  142. -spec decode_columns(non_neg_integer(), binary(), epgsql_binary:codec()) -> [epgsql:column()].
  143. decode_columns(0, _Bin, _Codec) -> [];
  144. decode_columns(Count, Bin, Codec) ->
  145. [Name, Rest] = decode_string(Bin),
  146. <<TableOid:?int32, AttribNum:?int16, TypeOid:?int32,
  147. Size:?int16, Modifier:?int32, Format:?int16, Rest2/binary>> = Rest,
  148. %% TODO: get rid of this 'type' (extra oid_db lookup)
  149. Type = epgsql_binary:oid_to_name(TypeOid, Codec),
  150. Desc = #column{
  151. name = Name,
  152. type = Type,
  153. oid = TypeOid,
  154. size = Size,
  155. modifier = Modifier,
  156. format = Format,
  157. table_oid = TableOid,
  158. table_attr_number = AttribNum},
  159. [Desc | decode_columns(Count - 1, Rest2, Codec)].
  160. %% @doc decode ParameterDescription
  161. -spec decode_parameters(binary(), epgsql_binary:codec()) ->
  162. [epgsql_oid_db:type_info() | {unknown_oid, epgsql_oid_db:oid()}].
  163. decode_parameters(<<_Count:?int16, Bin/binary>>, Codec) ->
  164. [case epgsql_binary:oid_to_info(Oid, Codec) of
  165. undefined -> {unknown_oid, Oid};
  166. TypeInfo -> TypeInfo
  167. end || <<Oid:?int32>> <= Bin].
  168. %% @doc decode CcommandComplete msg
  169. decode_complete(<<"SELECT", 0>>) -> select;
  170. decode_complete(<<"SELECT", _/binary>>) -> select;
  171. decode_complete(<<"BEGIN", 0>>) -> 'begin';
  172. decode_complete(<<"ROLLBACK", 0>>) -> rollback;
  173. decode_complete(Bin) ->
  174. [Str, _] = decode_string(Bin),
  175. case string:tokens(binary_to_list(Str), " ") of
  176. ["INSERT", _Oid, Rows] -> {insert, list_to_integer(Rows)};
  177. ["UPDATE", Rows] -> {update, list_to_integer(Rows)};
  178. ["DELETE", Rows] -> {delete, list_to_integer(Rows)};
  179. ["MOVE", Rows] -> {move, list_to_integer(Rows)};
  180. ["FETCH", Rows] -> {fetch, list_to_integer(Rows)};
  181. [Type | _Rest] -> lower_atom(Type)
  182. end.
  183. %% @doc encode types
  184. encode_types(Types, Codec) ->
  185. encode_types(Types, 0, <<>>, Codec).
  186. encode_types([], Count, Acc, _Codec) ->
  187. <<Count:?int16, Acc/binary>>;
  188. encode_types([Type | T], Count, Acc, Codec) ->
  189. Oid = case Type of
  190. undefined -> 0;
  191. _Any -> epgsql_binary:type_to_oid(Type, Codec)
  192. end,
  193. encode_types(T, Count + 1, <<Acc/binary, Oid:?int32>>, Codec).
  194. %% @doc encode expected column formats
  195. -spec encode_formats([epgsql:column()]) -> binary().
  196. encode_formats(Columns) ->
  197. encode_formats(Columns, 0, <<>>).
  198. encode_formats([], Count, Acc) ->
  199. <<Count:?int16, Acc/binary>>;
  200. encode_formats([#column{format = Format} | T], Count, Acc) ->
  201. encode_formats(T, Count + 1, <<Acc/binary, Format:?int16>>).
  202. %% @doc Returns 1 if Codec knows how to decode binary format of the type provided and 0 otherwise
  203. format({unknown_oid, _}, _) -> 0;
  204. format(#column{oid = Oid}, Codec) ->
  205. case epgsql_binary:supports(Oid, Codec) of
  206. true -> 1; %binary
  207. false -> 0 %text
  208. end.
  209. %% @doc encode parameters for 'Bind'
  210. -spec encode_parameters([], epgsql_binary:codec()) -> iolist().
  211. encode_parameters(Parameters, Codec) ->
  212. encode_parameters(Parameters, 0, <<>>, [], Codec).
  213. encode_parameters([], Count, Formats, Values, _Codec) ->
  214. [<<Count:?int16>>, Formats, <<Count:?int16>> | lists:reverse(Values)];
  215. encode_parameters([P | T], Count, Formats, Values, Codec) ->
  216. {Format, Value} = encode_parameter(P, Codec),
  217. Formats2 = <<Formats/binary, Format:?int16>>,
  218. Values2 = [Value | Values],
  219. encode_parameters(T, Count + 1, Formats2, Values2, Codec).
  220. %% @doc encode single 'typed' parameter
  221. -spec encode_parameter({Type, Val :: any()},
  222. epgsql_binary:codec()) -> {0..1, iolist()} when
  223. Type :: epgsql:type_name()
  224. | {array, epgsql:type_name()}
  225. | {unknown_oid, epgsql_oid_db:oid()}.
  226. encode_parameter({Type, Value}, Codec) ->
  227. case epgsql_binary:is_null(Value, Codec) of
  228. false ->
  229. encode_parameter(Type, Value, Codec);
  230. true ->
  231. {1, <<-1:?int32>>}
  232. end.
  233. encode_parameter({unknown_oid, _Oid}, Value, _Codec) ->
  234. {0, encode_text(Value)};
  235. encode_parameter(Type, Value, Codec) ->
  236. {1, epgsql_binary:encode(Type, Value, Codec)}.
  237. encode_text(B) when is_binary(B) -> encode_bin(B);
  238. encode_text(A) when is_atom(A) -> encode_bin(atom_to_binary(A, utf8));
  239. encode_text(I) when is_integer(I) -> encode_bin(integer_to_binary(I));
  240. encode_text(F) when is_float(F) -> encode_bin(float_to_binary(F));
  241. encode_text(L) when is_list(L) -> encode_bin(list_to_binary(L)).
  242. encode_bin(Bin) ->
  243. <<(byte_size(Bin)):?int32, Bin/binary>>.
  244. %% @doc Encode iodata with size-prefix (used for `StartupMessage' and `SSLRequest' packets)
  245. encode_command(Data) ->
  246. Size = iolist_size(Data),
  247. [<<(Size + 4):?int32>> | Data].
  248. %% @doc Encode PG command with type and size prefix
  249. encode_command(Type, Data) ->
  250. Size = iolist_size(Data),
  251. [<<Type:8, (Size + 4):?int32>> | Data].
  252. %% @doc encode replication status message
  253. encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN) ->
  254. {MegaSecs, Secs, MicroSecs} = os:timestamp(),
  255. %% microseconds since midnight on 2000-01-01
  256. Timestamp = ((MegaSecs * 1000000 + Secs) * 1000000 + MicroSecs) - 946684800*1000000,
  257. <<$r:8, ReceivedLSN:?int64, FlushedLSN:?int64, AppliedLSN:?int64, Timestamp:?int64, 0:8>>.