%%% @doc %%% Interface to encoder/decoder for postgresql %%% wire-protocol %%% %%% See also `include/protocol.hrl'. %%% @end %%% Copyright (C) 2009 - Will Glozer. All rights reserved. %%% Copyright (C) 2011 - Anton Lebedevich. All rights reserved. -module(epgsql_wire). -export([decode_message/1, decode_error/1, decode_strings/1, decode_columns/3, decode_parameters/2, encode_command/1, encode_command/2, build_decoder/2, decode_data/2, decode_complete/1, encode_types/2, encode_formats/1, format/2, encode_parameters/2, encode_standby_status_update/3]). -export_type([row_decoder/0]). -include("epgsql.hrl"). -include("protocol.hrl"). -opaque row_decoder() :: {[epgsql_binary:decoder()], [epgsql:column()], epgsql_binary:codec()}. %% @doc tries to extract single postgresql packet from TCP stream -spec decode_message(binary()) -> {byte(), binary(), binary()} | binary(). decode_message(<> = Bin) -> Len2 = Len - 4, case Rest of <> -> {Type, Data, Tail}; _Other -> Bin end; decode_message(Bin) -> Bin. %% @doc decode a single null-terminated string -spec decode_string(binary()) -> [binary(), ...]. decode_string(Bin) -> binary:split(Bin, <<0>>). %% @doc decode multiple null-terminated string -spec decode_strings(binary()) -> [binary(), ...]. decode_strings(Bin) -> %% Assert the last byte is what we want it to be %% Remove that byte from the Binary, so the zero %% terminators are separators. Then apply %% binary:split/3 directly on the remaining Subj Sz = byte_size(Bin) - 1, <> = Bin, binary:split(Subj, <<0>>, [global]). %% @doc decode error's field -spec decode_fields(binary()) -> [{byte(), binary()}]. decode_fields(Bin) -> decode_fields(Bin, []). decode_fields(<<0>>, Acc) -> Acc; decode_fields(<>, Acc) -> [Str, Rest2] = decode_string(Rest), decode_fields(Rest2, [{Type, Str} | Acc]). %% @doc decode ErrorResponse %% See [http://www.postgresql.org/docs/current/interactive/protocol-error-fields.html] -spec decode_error(binary()) -> epgsql:query_error(). decode_error(Bin) -> Fields = decode_fields(Bin), ErrCode = proplists:get_value($C, Fields), ErrName = epgsql_errcodes:to_name(ErrCode), {ErrSeverity, Extra} = case proplists:get_value($V, Fields) of undefined -> {proplists:get_value($S, Fields), []}; Severity -> {Severity, [{severity, proplists:get_value($S, Fields)}]} end, Error = #error{ severity = lower_atom(ErrSeverity), code = ErrCode, codename = ErrName, message = proplists:get_value($M, Fields), extra = lists:sort(Extra ++ lists:foldl(fun decode_error_extra/2, [], Fields))}, Error. %% consider updating #error.extra typespec when changing/adding extras decode_error_extra({$D, Val}, Acc) -> [{detail, Val} | Acc]; decode_error_extra({$H, Val}, Acc) -> [{hint, Val} | Acc]; decode_error_extra({$P, Val}, Acc) -> [{position, Val} | Acc]; decode_error_extra({$p, Val}, Acc) -> [{internal_position, Val} | Acc]; decode_error_extra({$q, Val}, Acc) -> [{internal_query, Val} | Acc]; decode_error_extra({$W, Val}, Acc) -> [{where, Val} | Acc]; decode_error_extra({$s, Val}, Acc) -> [{schema_name, Val} | Acc]; decode_error_extra({$t, Val}, Acc) -> [{table_name, Val} | Acc]; decode_error_extra({$c, Val}, Acc) -> [{column_name, Val} | Acc]; decode_error_extra({$d, Val}, Acc) -> [{data_type_name, Val} | Acc]; decode_error_extra({$n, Val}, Acc) -> [{constraint_name, Val} | Acc]; decode_error_extra({$F, Val}, Acc) -> [{file, Val} | Acc]; decode_error_extra({$L, Val}, Acc) -> [{line, Val} | Acc]; decode_error_extra({$R, Val}, Acc) -> [{routine, Val} | Acc]; decode_error_extra({_, _}, Acc) -> Acc. lower_atom(Str) when is_binary(Str) -> lower_atom(binary_to_list(Str)); lower_atom(Str) when is_list(Str) -> list_to_atom(string:to_lower(Str)). %% @doc Build decoder for DataRow -spec build_decoder([epgsql:column()], epgsql_binary:codec()) -> row_decoder(). build_decoder(Columns, Codec) -> Decoders = lists:map( fun(#column{oid = Oid, format = Format}) -> Fmt = case Format of 1 -> binary; 0 -> text end, epgsql_binary:oid_to_decoder(Oid, Fmt, Codec) end, Columns), {Decoders, Columns, Codec}. %% @doc decode row data -spec decode_data(binary(), row_decoder()) -> tuple(). decode_data(Bin, {Decoders, _Columns, Codec}) -> list_to_tuple(decode_data(Bin, Decoders, Codec)). decode_data(_, [], _) -> []; decode_data(<<-1:?int32, Rest/binary>>, [_Dec | Decs], Codec) -> [epgsql_binary:null(Codec) | decode_data(Rest, Decs, Codec)]; decode_data(<>, [Decoder | Decs], Codec) -> [epgsql_binary:decode(Value, Decoder) | decode_data(Rest, Decs, Codec)]. %% @doc decode RowDescription column information -spec decode_columns(non_neg_integer(), binary(), epgsql_binary:codec()) -> [epgsql:column()]. decode_columns(0, _Bin, _Codec) -> []; decode_columns(Count, Bin, Codec) -> [Name, Rest] = decode_string(Bin), <> = Rest, %% TODO: get rid of this 'type' (extra oid_db lookup) Type = epgsql_binary:oid_to_name(TypeOid, Codec), Desc = #column{ name = Name, type = Type, oid = TypeOid, size = Size, modifier = Modifier, format = Format, table_oid = TableOid, table_attr_number = AttribNum}, [Desc | decode_columns(Count - 1, Rest2, Codec)]. %% @doc decode ParameterDescription -spec decode_parameters(binary(), epgsql_binary:codec()) -> [epgsql_oid_db:type_info() | {unknown_oid, epgsql_oid_db:oid()}]. decode_parameters(<<_Count:?int16, Bin/binary>>, Codec) -> [case epgsql_binary:oid_to_info(Oid, Codec) of undefined -> {unknown_oid, Oid}; TypeInfo -> TypeInfo end || <> <= Bin]. %% @doc decode CcommandComplete msg decode_complete(<<"SELECT", 0>>) -> select; decode_complete(<<"SELECT", _/binary>>) -> select; decode_complete(<<"BEGIN", 0>>) -> 'begin'; decode_complete(<<"ROLLBACK", 0>>) -> rollback; decode_complete(Bin) -> [Str, _] = decode_string(Bin), case string:tokens(binary_to_list(Str), " ") of ["INSERT", _Oid, Rows] -> {insert, list_to_integer(Rows)}; ["UPDATE", Rows] -> {update, list_to_integer(Rows)}; ["DELETE", Rows] -> {delete, list_to_integer(Rows)}; ["MOVE", Rows] -> {move, list_to_integer(Rows)}; ["FETCH", Rows] -> {fetch, list_to_integer(Rows)}; [Type | _Rest] -> lower_atom(Type) end. %% @doc encode types encode_types(Types, Codec) -> encode_types(Types, 0, <<>>, Codec). encode_types([], Count, Acc, _Codec) -> <>; encode_types([Type | T], Count, Acc, Codec) -> Oid = case Type of undefined -> 0; _Any -> epgsql_binary:type_to_oid(Type, Codec) end, encode_types(T, Count + 1, <>, Codec). %% @doc encode expected column formats -spec encode_formats([epgsql:column()]) -> binary(). encode_formats(Columns) -> encode_formats(Columns, 0, <<>>). encode_formats([], Count, Acc) -> <>; encode_formats([#column{format = Format} | T], Count, Acc) -> encode_formats(T, Count + 1, <>). %% @doc Returns 1 if Codec knows how to decode binary format of the type provided and 0 otherwise format({unknown_oid, _}, _) -> 0; format(#column{oid = Oid}, Codec) -> case epgsql_binary:supports(Oid, Codec) of true -> 1; %binary false -> 0 %text end. %% @doc encode parameters for 'Bind' -spec encode_parameters([], epgsql_binary:codec()) -> iolist(). encode_parameters(Parameters, Codec) -> encode_parameters(Parameters, 0, <<>>, [], Codec). encode_parameters([], Count, Formats, Values, _Codec) -> [<>, Formats, <> | lists:reverse(Values)]; encode_parameters([P | T], Count, Formats, Values, Codec) -> {Format, Value} = encode_parameter(P, Codec), Formats2 = <>, Values2 = [Value | Values], encode_parameters(T, Count + 1, Formats2, Values2, Codec). %% @doc encode single 'typed' parameter -spec encode_parameter({Type, Val :: any()}, epgsql_binary:codec()) -> {0..1, iolist()} when Type :: epgsql:type_name() | {array, epgsql:type_name()} | {unknown_oid, epgsql_oid_db:oid()}. encode_parameter({Type, Value}, Codec) -> case epgsql_binary:is_null(Value, Codec) of false -> encode_parameter(Type, Value, Codec); true -> {1, <<-1:?int32>>} end. encode_parameter({unknown_oid, _Oid}, Value, _Codec) -> {0, encode_text(Value)}; encode_parameter(Type, Value, Codec) -> {1, epgsql_binary:encode(Type, Value, Codec)}. encode_text(B) when is_binary(B) -> encode_bin(B); encode_text(A) when is_atom(A) -> encode_bin(atom_to_binary(A, utf8)); encode_text(I) when is_integer(I) -> encode_bin(integer_to_binary(I)); encode_text(F) when is_float(F) -> encode_bin(float_to_binary(F)); encode_text(L) when is_list(L) -> encode_bin(list_to_binary(L)). encode_bin(Bin) -> <<(byte_size(Bin)):?int32, Bin/binary>>. %% @doc Encode iodata with size-prefix (used for `StartupMessage' and `SSLRequest' packets) encode_command(Data) -> Size = iolist_size(Data), [<<(Size + 4):?int32>> | Data]. %% @doc Encode PG command with type and size prefix encode_command(Type, Data) -> Size = iolist_size(Data), [<> | Data]. %% @doc encode replication status message encode_standby_status_update(ReceivedLSN, FlushedLSN, AppliedLSN) -> {MegaSecs, Secs, MicroSecs} = os:timestamp(), %% microseconds since midnight on 2000-01-01 Timestamp = ((MegaSecs * 1000000 + Secs) * 1000000 + MicroSecs) - 946684800*1000000, <<$r:8, ReceivedLSN:?int64, FlushedLSN:?int64, AppliedLSN:?int64, Timestamp:?int64, 0:8>>.