epgsql_wire.erl 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. %%% Copyright (C) 2009 - Will Glozer. All rights reserved.
  2. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved.
  3. -module(epgsql_wire).
  4. -export([decode_message/1,
  5. decode_error/1,
  6. decode_strings/1,
  7. decode_columns/3,
  8. decode_parameters/2,
  9. encode/1,
  10. encode/2,
  11. build_decoder/2,
  12. decode_data/2,
  13. decode_complete/1,
  14. encode_types/2,
  15. encode_formats/1,
  16. format/2,
  17. encode_parameters/2,
  18. encode_standby_status_update/3]).
  19. -include("epgsql.hrl").
  20. -include("epgsql_binary.hrl").
  21. decode_message(<<Type:8, Len:?int32, Rest/binary>> = Bin) ->
  22. Len2 = Len - 4,
  23. case Rest of
  24. <<Data:Len2/binary, Tail/binary>> ->
  25. {Type, Data, Tail};
  26. _Other ->
  27. Bin
  28. end;
  29. decode_message(Bin) ->
  30. Bin.
  31. %% decode a single null-terminated string
  32. decode_string(Bin) ->
  33. binary:split(Bin, <<0>>).
  34. %% decode multiple null-terminated string
  35. decode_strings(Bin) ->
  36. [<<>> | T] = lists:reverse(binary:split(Bin, <<0>>, [global])),
  37. lists:reverse(T).
  38. %% decode field
  39. decode_fields(Bin) ->
  40. decode_fields(Bin, []).
  41. decode_fields(<<0>>, Acc) ->
  42. Acc;
  43. decode_fields(<<Type:8, Rest/binary>>, Acc) ->
  44. [Str, Rest2] = decode_string(Rest),
  45. decode_fields(Rest2, [{Type, Str} | Acc]).
  46. %% decode ErrorResponse
  47. %% See http://www.postgresql.org/docs/current/interactive/protocol-error-fields.html
  48. decode_error(Bin) ->
  49. Fields = decode_fields(Bin),
  50. ErrCode = proplists:get_value($C, Fields),
  51. ErrName = epgsql_errcodes:to_name(ErrCode),
  52. {ErrSeverity, Extra} = case proplists:get_value($V, Fields) of
  53. undefined ->
  54. {proplists:get_value($S, Fields), []};
  55. Severity ->
  56. {Severity, [{severity, proplists:get_value($S, Fields)}]}
  57. end,
  58. Error = #error{
  59. severity = lower_atom(ErrSeverity),
  60. code = ErrCode,
  61. codename = ErrName,
  62. message = proplists:get_value($M, Fields),
  63. extra = lists:sort(Extra ++ lists:foldl(fun decode_error_extra/2, [], Fields))},
  64. Error.
  65. %% consider updating #error.extra typespec when changing/adding extras
  66. decode_error_extra({$D, Val}, Acc) ->
  67. [{detail, Val} | Acc];
  68. decode_error_extra({$H, Val}, Acc) ->
  69. [{hint, Val} | Acc];
  70. decode_error_extra({$P, Val}, Acc) ->
  71. [{position, Val} | Acc];
  72. decode_error_extra({$p, Val}, Acc) ->
  73. [{internal_position, Val} | Acc];
  74. decode_error_extra({$q, Val}, Acc) ->
  75. [{internal_query, Val} | Acc];
  76. decode_error_extra({$W, Val}, Acc) ->
  77. [{where, Val} | Acc];
  78. decode_error_extra({$s, Val}, Acc) ->
  79. [{schema_name, Val} | Acc];
  80. decode_error_extra({$t, Val}, Acc) ->
  81. [{table_name, Val} | Acc];
  82. decode_error_extra({$c, Val}, Acc) ->
  83. [{column_name, Val} | Acc];
  84. decode_error_extra({$d, Val}, Acc) ->
  85. [{data_type_name, Val} | Acc];
  86. decode_error_extra({$n, Val}, Acc) ->
  87. [{constraint_name, Val} | Acc];
  88. decode_error_extra({$F, Val}, Acc) ->
  89. [{file, Val} | Acc];
  90. decode_error_extra({$L, Val}, Acc) ->
  91. [{line, Val} | Acc];
  92. decode_error_extra({$R, Val}, Acc) ->
  93. [{routine, Val} | Acc];
  94. decode_error_extra({_, _}, Acc) ->
  95. Acc.
  96. lower_atom(Str) when is_binary(Str) ->
  97. lower_atom(binary_to_list(Str));
  98. lower_atom(Str) when is_list(Str) ->
  99. list_to_atom(string:to_lower(Str)).
  100. %% FIXME: return iolist
  101. encode(Data) ->
  102. Bin = iolist_to_binary(Data),
  103. <<(byte_size(Bin) + 4):?int32, Bin/binary>>.
  104. encode(Type, Data) ->
  105. Bin = iolist_to_binary(Data),
  106. <<Type:8, (byte_size(Bin) + 4):?int32, Bin/binary>>.
  107. %% Build decoder for DataRow
  108. build_decoder(Columns, Codec) ->
  109. {Columns, Codec}.
  110. %% decode row data
  111. %% FIXME: use body recursion
  112. decode_data(Bin, {Columns, Codec}) ->
  113. decode_data(Columns, Bin, [], Codec).
  114. decode_data([], _Bin, Acc, _Codec) ->
  115. list_to_tuple(lists:reverse(Acc));
  116. decode_data([_C | T], <<-1:?int32, Rest/binary>>, Acc, Codec) ->
  117. decode_data(T, Rest, [null | Acc], Codec);
  118. decode_data([C | T], <<Len:?int32, Value:Len/binary, Rest/binary>>, Acc, Codec) ->
  119. Value2 = case C of
  120. #column{type = Type, format = 1} ->
  121. epgsql_binary:decode(Type, Value, Codec);
  122. #column{} ->
  123. Value
  124. end,
  125. decode_data(T, Rest, [Value2 | Acc], Codec).
  126. %% decode column information
  127. %% TODO: use body-recursion
  128. decode_columns(Count, Bin, Codec) ->
  129. decode_columns(Count, Bin, [], Codec).
  130. decode_columns(0, _Bin, Acc, _Codec) ->
  131. lists:reverse(Acc);
  132. decode_columns(N, Bin, Acc, Codec) ->
  133. [Name, Rest] = decode_string(Bin),
  134. <<_Table_Oid:?int32, _Attrib_Num:?int16, Type_Oid:?int32,
  135. Size:?int16, Modifier:?int32, Format:?int16, Rest2/binary>> = Rest,
  136. Desc = #column{
  137. name = Name,
  138. type = epgsql_binary:oid2type(Type_Oid, Codec),
  139. size = Size,
  140. modifier = Modifier,
  141. format = Format},
  142. decode_columns(N - 1, Rest2, [Desc | Acc], Codec).
  143. %% decode ParameterDescription
  144. decode_parameters(<<_Count:?int16, Bin/binary>>, Codec) ->
  145. [epgsql_binary:oid2type(Oid, Codec) || <<Oid:?int32>> <= Bin].
  146. %% decode command complete msg
  147. decode_complete(<<"SELECT", 0>>) -> select;
  148. decode_complete(<<"SELECT", _/binary>>) -> select;
  149. decode_complete(<<"BEGIN", 0>>) -> 'begin';
  150. decode_complete(<<"ROLLBACK", 0>>) -> rollback;
  151. decode_complete(Bin) ->
  152. [Str, _] = decode_string(Bin),
  153. case string:tokens(binary_to_list(Str), " ") of
  154. ["INSERT", _Oid, Rows] -> {insert, list_to_integer(Rows)};
  155. ["UPDATE", Rows] -> {update, list_to_integer(Rows)};
  156. ["DELETE", Rows] -> {delete, list_to_integer(Rows)};
  157. ["MOVE", Rows] -> {move, list_to_integer(Rows)};
  158. ["FETCH", Rows] -> {fetch, list_to_integer(Rows)};
  159. [Type | _Rest] -> lower_atom(Type)
  160. end.
  161. %% encode types
  162. encode_types(Types, Codec) ->
  163. encode_types(Types, 0, <<>>, Codec).
  164. encode_types([], Count, Acc, _Codec) ->
  165. <<Count:?int16, Acc/binary>>;
  166. encode_types([Type | T], Count, Acc, Codec) ->
  167. Oid = case Type of
  168. undefined -> 0;
  169. _Any -> epgsql_binary:type2oid(Type, Codec)
  170. end,
  171. encode_types(T, Count + 1, <<Acc/binary, Oid:?int32>>, Codec).
  172. %% encode column formats
  173. encode_formats(Columns) ->
  174. encode_formats(Columns, 0, <<>>).
  175. encode_formats([], Count, Acc) ->
  176. <<Count:?int16, Acc/binary>>;
  177. encode_formats([#column{format = Format} | T], Count, Acc) ->
  178. encode_formats(T, Count + 1, <<Acc/binary, Format:?int16>>).
  179. format(Type, _Codec) ->
  180. case epgsql_binary:supports(Type) of
  181. true -> 1;
  182. false -> 0
  183. end.
  184. %% encode parameters
  185. encode_parameters(Parameters, Codec) ->
  186. encode_parameters(Parameters, 0, <<>>, <<>>, Codec).
  187. encode_parameters([], Count, Formats, Values, _Codec) ->
  188. <<Count:?int16, Formats/binary, Count:?int16, Values/binary>>;
  189. encode_parameters([P | T], Count, Formats, Values, Codec) ->
  190. {Format, Value} = encode_parameter(P, Codec),
  191. Formats2 = <<Formats/binary, Format:?int16>>,
  192. Values2 = <<Values/binary, Value/binary>>,
  193. encode_parameters(T, Count + 1, Formats2, Values2, Codec).
  194. %% encode parameter
  195. encode_parameter({Type, Value}, Codec) ->
  196. case epgsql_binary:encode(Type, Value, Codec) of
  197. Bin when is_binary(Bin) -> {1, Bin};
  198. {error, unsupported} -> encode_parameter(Value)
  199. end;
  200. encode_parameter(Value, _Codec) -> encode_parameter(Value).
  201. encode_parameter(A) when is_atom(A) -> {0, encode_list(atom_to_list(A))};
  202. encode_parameter(B) when is_binary(B) -> {0, <<(byte_size(B)):?int32, B/binary>>};
  203. encode_parameter(I) when is_integer(I) -> {0, encode_list(integer_to_list(I))};
  204. encode_parameter(F) when is_float(F) -> {0, encode_list(float_to_list(F))};
  205. encode_parameter(L) when is_list(L) -> {0, encode_list(L)}.
  206. encode_list(L) ->
  207. Bin = list_to_binary(L),
  208. <<(byte_size(Bin)):?int32, Bin/binary>>.
  209. encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN) ->
  210. {MegaSecs, Secs, MicroSecs} = os:timestamp(),
  211. Timestamp = ((MegaSecs * 1000000 + Secs) * 1000000 + MicroSecs) - 946684800*1000000, %% microseconds since midnight on 2000-01-01
  212. <<$r:8, ReceivedLSN:?int64, FlushedLSN:?int64, AppliedLSN:?int64, Timestamp:?int64, 0:8>>.