Просмотр исходного кода

Add support for types with dynamic oids

bullno1 11 лет назад
Родитель
Сommit
85d58854b4
5 измененных файлов с 172 добавлено и 124 удалено
  1. 11 0
      src/pgsql.erl
  2. 110 77
      src/pgsql_binary.erl
  3. 17 11
      src/pgsql_sock.erl
  4. 0 4
      src/pgsql_types.erl
  5. 34 32
      src/pgsql_wire.erl

+ 11 - 0
src/pgsql.erl

@@ -16,6 +16,7 @@
          close/2, close/3,
          sync/1,
          cancel/1,
+         update_type_cache/1,
          with_transaction/2,
          sync_on_error/2]).
 
@@ -76,11 +77,21 @@ connect(C, Host, Username, Password, Opts) ->
                          {connect, Host, Username, Password, Opts},
                          infinity) of
         connected ->
+            update_type_cache(C),
             {ok, C};
         Error = {error, _} ->
             Error
     end.
 
+-spec update_type_cache(connection()) -> ok.
+update_type_cache(C) ->
+    DynamicTypes = [<<"hstore">>],
+    Query = "SELECT typname, oid::int4, typarray::int4"
+            " FROM pg_type"
+            " WHERE typname = ANY($1::varchar[])",
+    {ok, _, TypeInfos} = equery(C, Query, [DynamicTypes]),
+    ok = gen_server:call(C, {update_type_cache, TypeInfos}).
+
 -spec close(connection()) -> ok.
 close(C) ->
     pgsql_sock:close(C).

+ 110 - 77
src/pgsql_binary.erl

@@ -2,79 +2,112 @@
 
 -module(pgsql_binary).
 
--export([encode/2, decode/2, supports/1]).
+-export([new_codec/1,
+         update_type_cache/2,
+         type2oid/2, oid2type/2,
+         encode/3, decode/3, supports/1]).
+
+-record(codec, {
+    type2oid = [],
+    oid2type = []
+}).
 
 -include("pgsql_binary.hrl").
 
 -define(datetime, (get(datetime_mod))).
 
-encode(_Any, null)                          -> <<-1:?int32>>;
-encode(bool, true)                          -> <<1:?int32, 1:1/big-signed-unit:8>>;
-encode(bool, false)                         -> <<1:?int32, 0:1/big-signed-unit:8>>;
-encode(int2, N)                             -> <<2:?int32, N:1/big-signed-unit:16>>;
-encode(int4, N)                             -> <<4:?int32, N:1/big-signed-unit:32>>;
-encode(int8, N)                             -> <<8:?int32, N:1/big-signed-unit:64>>;
-encode(float4, N)                           -> <<4:?int32, N:1/big-float-unit:32>>;
-encode(float8, N)                           -> <<8:?int32, N:1/big-float-unit:64>>;
-encode(bpchar, C) when is_integer(C)        -> <<1:?int32, C:1/big-unsigned-unit:8>>;
-encode(bpchar, B) when is_binary(B)         -> <<(byte_size(B)):?int32, B/binary>>;
-encode(time = Type, B)                      -> ?datetime:encode(Type, B);
-encode(timetz = Type, B)                    -> ?datetime:encode(Type, B);
-encode(date = Type, B)                      -> ?datetime:encode(Type, B);
-encode(timestamp = Type, B)                 -> ?datetime:encode(Type, B);
-encode(timestamptz = Type, B)               -> ?datetime:encode(Type, B);
-encode(interval = Type, B)                  -> ?datetime:encode(Type, B);
-encode(bytea, B) when is_binary(B)          -> <<(byte_size(B)):?int32, B/binary>>;
-encode(text, B) when is_binary(B)           -> <<(byte_size(B)):?int32, B/binary>>;
-encode(varchar, B) when is_binary(B)        -> <<(byte_size(B)):?int32, B/binary>>;
-encode(uuid, B) when is_binary(B)           -> encode_uuid(B);
-encode(hstore, {L}) when is_list(L)         -> encode_hstore(L);
-encode({array, char}, L) when is_list(L)    -> encode_array(bpchar, L);
-encode({array, Type}, L) when is_list(L)    -> encode_array(Type, L);
-encode(Type, L) when is_list(L)             -> encode(Type, list_to_binary(L));
-encode(_Type, _Value)                       -> {error, unsupported}.
-
-decode(bool, <<1:1/big-signed-unit:8>>)     -> true;
-decode(bool, <<0:1/big-signed-unit:8>>)     -> false;
-decode(bpchar, <<C:1/big-unsigned-unit:8>>) -> C;
-decode(int2, <<N:1/big-signed-unit:16>>)    -> N;
-decode(int4, <<N:1/big-signed-unit:32>>)    -> N;
-decode(int8, <<N:1/big-signed-unit:64>>)    -> N;
-decode(float4, <<N:1/big-float-unit:32>>)   -> N;
-decode(float8, <<N:1/big-float-unit:64>>)   -> N;
-decode(record, <<_:?int32, Rest/binary>>)   -> list_to_tuple(decode_record(Rest, []));
-decode(time = Type, B)                      -> ?datetime:decode(Type, B);
-decode(timetz = Type, B)                    -> ?datetime:decode(Type, B);
-decode(date = Type, B)                      -> ?datetime:decode(Type, B);
-decode(timestamp = Type, B)                 -> ?datetime:decode(Type, B);
-decode(timestamptz = Type, B)               -> ?datetime:decode(Type, B);
-decode(interval = Type, B)                  -> ?datetime:decode(Type, B);
-decode(uuid, B)                             -> decode_uuid(B);
-decode(hstore, Hstore)                      -> decode_hstore(Hstore);
-decode({array, _Type}, B)                   -> decode_array(B);
-decode(_Other, Bin)                         -> Bin.
-
-encode_array(Type, A) ->
-    {Data, {NDims, Lengths}} = encode_array(Type, A, 0, []),
-    Oid  = pgsql_types:type2oid(Type),
+new_codec([]) -> #codec{}.
+
+update_type_cache(TypeInfos, Codec) ->
+    Type2Oid = lists:flatmap(
+        fun({NameBin, ElementOid, ArrayOid}) ->
+            Name = erlang:binary_to_atom(NameBin, utf8),
+            [{Name, ElementOid}, {{array, Name}, ArrayOid}]
+        end,
+        TypeInfos),
+    Oid2Type = [{Oid, Type} || {Type, Oid} <- Type2Oid],
+    Codec#codec{type2oid = Type2Oid, oid2type = Oid2Type}.
+
+oid2type(Oid, #codec{oid2type = Oid2Type}) ->
+    case pgsql_types:oid2type(Oid) of
+        {unknown_oid, _} ->
+            proplists:get_value(Oid, Oid2Type, {unknown_oid, Oid});
+        Type -> Type
+    end.
+
+type2oid(Type, #codec{type2oid = Type2Oid}) ->
+    case pgsql_types:type2oid(Type) of
+        {unknown_type, _} ->
+            proplists:get_value(Type, Type2Oid, {unknown_type, Type});
+        Oid -> Oid
+    end.
+
+encode(_Any, null, _)                       -> <<-1:?int32>>;
+encode(bool, true, _)                       -> <<1:?int32, 1:1/big-signed-unit:8>>;
+encode(bool, false, _)                      -> <<1:?int32, 0:1/big-signed-unit:8>>;
+encode(int2, N, _)                          -> <<2:?int32, N:1/big-signed-unit:16>>;
+encode(int4, N, _)                          -> <<4:?int32, N:1/big-signed-unit:32>>;
+encode(int8, N, _)                          -> <<8:?int32, N:1/big-signed-unit:64>>;
+encode(float4, N, _)                        -> <<4:?int32, N:1/big-float-unit:32>>;
+encode(float8, N, _)                        -> <<8:?int32, N:1/big-float-unit:64>>;
+encode(bpchar, C, _) when is_integer(C)     -> <<1:?int32, C:1/big-unsigned-unit:8>>;
+encode(bpchar, B, _) when is_binary(B)      -> <<(byte_size(B)):?int32, B/binary>>;
+encode(time = Type, B, _)                   -> ?datetime:encode(Type, B);
+encode(timetz = Type, B, _)                 -> ?datetime:encode(Type, B);
+encode(date = Type, B, _)                   -> ?datetime:encode(Type, B);
+encode(timestamp = Type, B, _)              -> ?datetime:encode(Type, B);
+encode(timestamptz = Type, B, _)            -> ?datetime:encode(Type, B);
+encode(interval = Type, B, _)               -> ?datetime:encode(Type, B);
+encode(bytea, B, _) when is_binary(B)       -> <<(byte_size(B)):?int32, B/binary>>;
+encode(text, B, _) when is_binary(B)        -> <<(byte_size(B)):?int32, B/binary>>;
+encode(varchar, B, _) when is_binary(B)     -> <<(byte_size(B)):?int32, B/binary>>;
+encode(uuid, B, _) when is_binary(B)        -> encode_uuid(B);
+encode({array, char}, L, Codec) when is_list(L) -> encode_array(bpchar, type2oid(bpchar, Codec), L, Codec);
+encode({array, Type}, L, Codec) when is_list(L) -> encode_array(Type, type2oid(Type, Codec), L, Codec);
+encode(hstore, {L}, _) when is_list(L)      -> encode_hstore(L);
+encode(Type, L, Codec) when is_list(L)      -> encode(Type, list_to_binary(L), Codec);
+encode(_Type, _Value, _)                    -> {error, unsupported}.
+
+decode(bool, <<1:1/big-signed-unit:8>>, _)     -> true;
+decode(bool, <<0:1/big-signed-unit:8>>, _)     -> false;
+decode(bpchar, <<C:1/big-unsigned-unit:8>>, _) -> C;
+decode(int2, <<N:1/big-signed-unit:16>>, _)    -> N;
+decode(int4, <<N:1/big-signed-unit:32>>, _)    -> N;
+decode(int8, <<N:1/big-signed-unit:64>>, _)    -> N;
+decode(float4, <<N:1/big-float-unit:32>>, _)   -> N;
+decode(float8, <<N:1/big-float-unit:64>>, _)   -> N;
+decode(record, <<_:?int32, Rest/binary>>, Codec) -> list_to_tuple(decode_record(Rest, [], Codec));
+decode(time = Type, B, _)                      -> ?datetime:decode(Type, B);
+decode(timetz = Type, B, _)                    -> ?datetime:decode(Type, B);
+decode(date = Type, B, _)                      -> ?datetime:decode(Type, B);
+decode(timestamp = Type, B, _)                 -> ?datetime:decode(Type, B);
+decode(timestamptz = Type, B, _)               -> ?datetime:decode(Type, B);
+decode(interval = Type, B, _)                  -> ?datetime:decode(Type, B);
+decode(uuid, B, _)                             -> decode_uuid(B);
+decode(hstore, Hstore, _)                      -> decode_hstore(Hstore);
+decode({array, _Type}, B, Codec)               -> decode_array(B, Codec);
+decode(_Other, Bin, _)                         -> Bin.
+
+encode_array(Type, Oid, A, Codec) ->
+    {Data, {NDims, Lengths}} = encode_array(Type, A, 0, [], Codec),
     Lens = [<<N:?int32, 1:?int32>> || N <- lists:reverse(Lengths)],
     Hdr  = <<NDims:?int32, 0:?int32, Oid:?int32>>,
     Bin  = iolist_to_binary([Hdr, Lens, Data]),
     <<(byte_size(Bin)):?int32, Bin/binary>>.
 
-encode_array(_Type, [], NDims, Lengths) ->
+encode_array(_Type, [], NDims, Lengths, _Codec) ->
     {<<>>, {NDims, Lengths}};
-encode_array(Type, [H | _] = Array, NDims, Lengths) when not is_list(H) ->
-    F = fun(E, Len) -> {encode(Type, E), Len + 1} end,
+encode_array(Type, [H | _] = Array, NDims, Lengths, Codec) when not is_list(H) ->
+    F = fun(E, Len) -> {encode(Type, E, Codec), Len + 1} end,
     {Data, Len} = lists:mapfoldl(F, 0, Array),
     {Data, {NDims + 1, [Len | Lengths]}};
-encode_array(uuid, [_H | _] = Array, NDims, Lengths) ->
-    F = fun(E, Len) -> {encode(uuid, E), Len + 1} end,
+encode_array(uuid, [_H | _] = Array, NDims, Lengths, Codec) ->
+    F = fun(E, Len) -> {encode(uuid, E, Codec), Len + 1} end,
     {Data, Len} = lists:mapfoldl(F, 0, Array),
     {Data, {NDims + 1, [Len | Lengths]}};
-encode_array(Type, Array, NDims, Lengths) ->
+encode_array(Type, Array, NDims, Lengths, Codec) ->
     Lengths2 = [length(Array) | Lengths],
-    F = fun(A2, {_NDims, _Lengths}) -> encode_array(Type, A2, NDims, Lengths2) end,
+    F = fun(A2, {_NDims, _Lengths}) -> encode_array(Type, A2, NDims, Lengths2, Codec) end,
     {Data, {NDims2, Lengths3}} = lists:mapfoldl(F, {NDims, Lengths2}, Array),
     {Data, {NDims2 + 1, Lengths3}}.
 
@@ -105,36 +138,36 @@ encode_hstore_string(Str) when is_float(Str) ->
     encode_hstore_string(iolist_to_binary(io_lib:format("~w", [Str])));
 encode_hstore_string(Str) when is_binary(Str) -> <<(byte_size(Str)):?int32, Str/binary>>.
 
-decode_array(<<NDims:?int32, _HasNull:?int32, Oid:?int32, Rest/binary>>) ->
+decode_array(<<NDims:?int32, _HasNull:?int32, Oid:?int32, Rest/binary>>, Codec) ->
     {Dims, Data} = erlang:split_binary(Rest, NDims * 2 * 4),
     Lengths = [Len || <<Len:?int32, _LBound:?int32>> <= Dims],
-    Type = pgsql_types:oid2type(Oid),
-    {Array, <<>>} = decode_array(Data, Type, Lengths),
+    Type = oid2type(Oid, Codec),
+    {Array, <<>>} = decode_array(Data, Type, Lengths, Codec),
     Array.
 
-decode_array(Data, _Type, [])  ->
+decode_array(Data, _Type, [], _Codec)  ->
     {[], Data};
-decode_array(Data, Type, [Len]) ->
-    decode_elements(Data, Type, [], Len);
-decode_array(Data, Type, [Len | T]) ->
-    F = fun(_N, Rest) -> decode_array(Rest, Type, T) end,
+decode_array(Data, Type, [Len], Codec) ->
+    decode_elements(Data, Type, [], Len, Codec);
+decode_array(Data, Type, [Len | T], Codec) ->
+    F = fun(_N, Rest) -> decode_array(Rest, Type, T, Codec) end,
     lists:mapfoldl(F, Data, lists:seq(1, Len)).
 
-decode_elements(Rest, _Type, Acc, 0) ->
+decode_elements(Rest, _Type, Acc, 0, _Codec) ->
     {lists:reverse(Acc), Rest};
-decode_elements(<<-1:?int32, Rest/binary>>, Type, Acc, N) ->
-    decode_elements(Rest, Type, [null | Acc], N - 1);
-decode_elements(<<Len:?int32, Value:Len/binary, Rest/binary>>, Type, Acc, N) ->
-    Value2 = decode(Type, Value),
-    decode_elements(Rest, Type, [Value2 | Acc], N - 1).
+decode_elements(<<-1:?int32, Rest/binary>>, Type, Acc, N, Codec) ->
+    decode_elements(Rest, Type, [null | Acc], N - 1, Codec);
+decode_elements(<<Len:?int32, Value:Len/binary, Rest/binary>>, Type, Acc, N, Codec) ->
+    Value2 = decode(Type, Value, Codec),
+    decode_elements(Rest, Type, [Value2 | Acc], N - 1, Codec).
 
-decode_record(<<>>, Acc) ->
+decode_record(<<>>, Acc, _Codec) ->
     lists:reverse(Acc);
-decode_record(<<_Type:?int32, -1:?int32, Rest/binary>>, Acc) ->
-    decode_record(Rest, [null | Acc]);
-decode_record(<<Type:?int32, Len:?int32, Value:Len/binary, Rest/binary>>, Acc) ->
-    Value2 = decode(pgsql_types:oid2type(Type), Value),
-    decode_record(Rest, [Value2 | Acc]).
+decode_record(<<_Type:?int32, -1:?int32, Rest/binary>>, Acc, Codec) ->
+    decode_record(Rest, [null | Acc], Codec);
+decode_record(<<Type:?int32, Len:?int32, Value:Len/binary, Rest/binary>>, Acc, Codec) ->
+    Value2 = decode(oid2type(Type, Codec), Value, Codec),
+    decode_record(Rest, [Value2 | Acc], Codec).
 
 decode_uuid(<<U0:32, U1:16, U2:16, U3:16, U4:48>>) ->
     Format = "~8.16.0b-~4.16.0b-~4.16.0b-~4.16.0b-~12.16.0b",

+ 17 - 11
src/pgsql_sock.erl

@@ -62,6 +62,7 @@
                 data = <<>>,
                 backend,
                 handler,
+                codec,
                 queue = queue:new(),
                 async,
                 parameters = [],
@@ -93,6 +94,10 @@ cancel(S) ->
 init([]) ->
     {ok, #state{}}.
 
+handle_call({update_type_cache, TypeInfos}, _From, #state{codec = Codec} = State) ->
+    Codec2 = pgsql_binary:update_type_cache(TypeInfos, Codec),
+    {reply, ok, State#state{codec = Codec2}};
+
 handle_call({get_parameter, Name}, _From, State) ->
     case lists:keysearch(Name, 1, State#state.parameters) of
         {value, {Name, Value}} -> Value;
@@ -199,9 +204,9 @@ command({squery, Sql}, State) ->
 %% TODO add fast_equery command that doesn't need parsed statement,
 %% uses default (text) column format,
 %% sends Describe after Bind to get RowDescription
-command({equery, Statement, Parameters}, State) ->
+command({equery, Statement, Parameters}, #state{codec = Codec} = State) ->
     #statement{name = StatementName, columns = Columns} = Statement,
-    Bin1 = pgsql_wire:encode_parameters(Parameters),
+    Bin1 = pgsql_wire:encode_parameters(Parameters, Codec),
     Bin2 = pgsql_wire:encode_formats(Columns),
     send(State, ?BIND, ["", 0, StatementName, 0, Bin1, Bin2]),
     send(State, ?EXECUTE, ["", 0, <<0:?int32>>]),
@@ -210,16 +215,16 @@ command({equery, Statement, Parameters}, State) ->
     {noreply, State};
 
 command({parse, Name, Sql, Types}, State) ->
-    Bin = pgsql_wire:encode_types(Types),
+    Bin = pgsql_wire:encode_types(Types, State#state.codec),
     send(State, ?PARSE, [Name, 0, Sql, 0, Bin]),
     send(State, ?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]),
     send(State, ?FLUSH, []),
     {noreply, State};
 
-command({bind, Statement, PortalName, Parameters}, State) ->
+command({bind, Statement, PortalName, Parameters}, #state{codec = Codec} = State) ->
     #statement{name = StatementName, columns = Columns, types = Types} = Statement,
     Typed_Parameters = lists:zip(Types, Parameters),
-    Bin1 = pgsql_wire:encode_parameters(Typed_Parameters),
+    Bin1 = pgsql_wire:encode_parameters(Typed_Parameters, Codec),
     Bin2 = pgsql_wire:encode_formats(Columns),
     send(State, ?BIND, [PortalName, 0, StatementName, 0, Bin1, Bin2]),
     send(State, ?FLUSH, []),
@@ -231,7 +236,7 @@ command({execute, _Statement, PortalName, MaxRows}, State) ->
     {noreply, State};
 
 command({execute_batch, Batch}, State) ->
-    #state{mod = Mod, sock = Sock} = State,
+    #state{mod = Mod, sock = Sock, codec = Codec} = State,
     BindExecute =
         lists:map(
           fun({Statement, Parameters}) ->
@@ -239,7 +244,7 @@ command({execute_batch, Batch}, State) ->
                              columns = Columns,
                              types = Types} = Statement,
                   Typed_Parameters = lists:zip(Types, Parameters),
-                  Bin1 = pgsql_wire:encode_parameters(Typed_Parameters),
+                  Bin1 = pgsql_wire:encode_parameters(Typed_Parameters, Codec),
                   Bin2 = pgsql_wire:encode_formats(Columns),
                   [pgsql_wire:encode(?BIND, [0, StatementName, 0,
                                              Bin1, Bin2]),
@@ -507,7 +512,8 @@ initializing({?READY_FOR_QUERY, <<Status:8>>}, State) ->
         {value, {_, <<"off">>}} -> put(datetime_mod, pgsql_fdatetime)
     end,
     State2 = finish(State#state{handler = on_message,
-                               txstatus = Status},
+                               txstatus = Status,
+                               codec = pgsql_binary:new_codec([])},
                    connected),
     {noreply, State2};
 
@@ -523,13 +529,13 @@ on_message({?PARSE_COMPLETE, <<>>}, State) ->
 
 %% ParameterDescription
 on_message({?PARAMETER_DESCRIPTION, <<_Count:?int16, Bin/binary>>}, State) ->
-    Types = [pgsql_types:oid2type(Oid) || <<Oid:?int32>> <= Bin],
+    Types = [pgsql_binary:oid2type(Oid, State#state.codec) || <<Oid:?int32>> <= Bin],
     State2 = notify(State#state{types = Types}, {types, Types}),
     {noreply, State2};
 
 %% RowDescription
 on_message({?ROW_DESCRIPTION, <<Count:?int16, Bin/binary>>}, State) ->
-    Columns = pgsql_wire:decode_columns(Count, Bin),
+    Columns = pgsql_wire:decode_columns(Count, Bin, State#state.codec),
     Columns2 =
         case command_tag(State) of
             C when C == describe_portal; C == squery ->
@@ -592,7 +598,7 @@ on_message({?CLOSE_COMPLETE, <<>>}, State) ->
 
 %% DataRow
 on_message({?DATA_ROW, <<_Count:?int16, Bin/binary>>}, State) ->
-    Data = pgsql_wire:decode_data(get_columns(State), Bin),
+    Data = pgsql_wire:decode_data(get_columns(State), Bin, State#state.codec),
     {noreply, add_row(State, Data)};
 
 %% PortalSuspended

+ 0 - 4
src/pgsql_types.erl

@@ -93,8 +93,6 @@ oid2type(2776) -> anynonarray;
 oid2type(2950) -> uuid;
 oid2type(2951) -> {array, uuid};
 oid2type(3500) -> anyenum;
-oid2type(16831) -> hstore;
-oid2type(16836) -> {array, hstore};
 oid2type(Oid)  -> {unknown_oid, Oid}.
 
 type2oid(bool)                  -> 16;
@@ -188,6 +186,4 @@ type2oid(anynonarray)           -> 2776;
 type2oid(uuid)                  -> 2950;
 type2oid({array, uuid})         -> 2951;
 type2oid(anyenum)               -> 3500;
-type2oid(hstore)                -> 16831;
-type2oid({array, hstore})       -> 16836;
 type2oid(Type)                  -> {unknown_type, Type}.

+ 34 - 32
src/pgsql_wire.erl

@@ -6,15 +6,15 @@
 -export([decode_message/1,
          decode_error/1,
          decode_strings/1,
-         decode_columns/2,
+         decode_columns/3,
          encode/1,
          encode/2,
-         decode_data/2,
+         decode_data/3,
          decode_complete/1,
-         encode_types/1,
+         encode_types/2,
          encode_formats/1,
          format/1,
-         encode_parameters/1]).
+         encode_parameters/2]).
 
 -include("pgsql.hrl").
 -include("pgsql_binary.hrl").
@@ -92,37 +92,37 @@ encode(Type, Data) ->
     <<Type:8, (byte_size(Bin) + 4):?int32, Bin/binary>>.
 
 %% decode data
-decode_data(Columns, Bin) ->
-    decode_data(Columns, Bin, []).
+decode_data(Columns, Bin, Codec) ->
+    decode_data(Columns, Bin, [], Codec).
 
-decode_data([], _Bin, Acc) ->
+decode_data([], _Bin, Acc, _Codec) ->
     list_to_tuple(lists:reverse(Acc));
-decode_data([_C | T], <<-1:?int32, Rest/binary>>, Acc) ->
-    decode_data(T, Rest, [null | Acc]);
-decode_data([C | T], <<Len:?int32, Value:Len/binary, Rest/binary>>, Acc) ->
+decode_data([_C | T], <<-1:?int32, Rest/binary>>, Acc, Codec) ->
+    decode_data(T, Rest, [null | Acc], Codec);
+decode_data([C | T], <<Len:?int32, Value:Len/binary, Rest/binary>>, Acc, Codec) ->
     case C of
-        #column{type = Type, format = 1}   -> Value2 = pgsql_binary:decode(Type, Value);
+        #column{type = Type, format = 1}   -> Value2 = pgsql_binary:decode(Type, Value, Codec);
         #column{}                          -> Value2 = Value
     end,
-    decode_data(T, Rest, [Value2 | Acc]).
+    decode_data(T, Rest, [Value2 | Acc], Codec).
 
 %% decode column information
-decode_columns(Count, Bin) ->
-    decode_columns(Count, Bin, []).
+decode_columns(Count, Bin, Codec) ->
+    decode_columns(Count, Bin, [], Codec).
 
-decode_columns(0, _Bin, Acc) ->
+decode_columns(0, _Bin, Acc, _Codec) ->
     lists:reverse(Acc);
-decode_columns(N, Bin, Acc) ->
+decode_columns(N, Bin, Acc, Codec) ->
     [Name, Rest] = decode_string(Bin),
     <<_Table_Oid:?int32, _Attrib_Num:?int16, Type_Oid:?int32,
      Size:?int16, Modifier:?int32, Format:?int16, Rest2/binary>> = Rest,
     Desc = #column{
       name     = Name,
-      type     = pgsql_types:oid2type(Type_Oid),
+      type     = pgsql_binary:oid2type(Type_Oid, Codec),
       size     = Size,
       modifier = Modifier,
       format   = Format},
-    decode_columns(N - 1, Rest2, [Desc | Acc]).
+    decode_columns(N - 1, Rest2, [Desc | Acc], Codec).
 
 %% decode command complete msg
 decode_complete(<<"SELECT", 0>>)        -> select;
@@ -141,18 +141,18 @@ decode_complete(Bin) ->
     end.
 
 %% encode types
-encode_types(Types) ->
-    encode_types(Types, 0, <<>>).
+encode_types(Types, Codec) ->
+    encode_types(Types, 0, <<>>, Codec).
 
-encode_types([], Count, Acc) ->
+encode_types([], Count, Acc, _Codec) ->
     <<Count:?int16, Acc/binary>>;
 
-encode_types([Type | T], Count, Acc) ->
+encode_types([Type | T], Count, Acc, Codec) ->
     case Type of
         undefined -> Oid = 0;
-        _Any      -> Oid = pgsql_types:type2oid(Type)
+        _Any      -> Oid = pgsql_binary:type2oid(Type, Codec)
     end,
-    encode_types(T, Count + 1, <<Acc/binary, Oid:?int32>>).
+    encode_types(T, Count + 1, <<Acc/binary, Oid:?int32>>, Codec).
 
 %% encode column formats
 encode_formats(Columns) ->
@@ -171,25 +171,27 @@ format(Type) ->
     end.
 
 %% encode parameters
-encode_parameters(Parameters) ->
-    encode_parameters(Parameters, 0, <<>>, <<>>).
+encode_parameters(Parameters, Codec) ->
+    encode_parameters(Parameters, 0, <<>>, <<>>, Codec).
 
-encode_parameters([], Count, Formats, Values) ->
+encode_parameters([], Count, Formats, Values, _Codec) ->
     <<Count:?int16, Formats/binary, Count:?int16, Values/binary>>;
 
-encode_parameters([P | T], Count, Formats, Values) ->
-    {Format, Value} = encode_parameter(P),
+encode_parameters([P | T], Count, Formats, Values, Codec) ->
+    {Format, Value} = encode_parameter(P, Codec),
     Formats2 = <<Formats/binary, Format:?int16>>,
     Values2 = <<Values/binary, Value/binary>>,
-    encode_parameters(T, Count + 1, Formats2, Values2).
+    encode_parameters(T, Count + 1, Formats2, Values2, Codec).
 
 %% encode parameter
 
-encode_parameter({Type, Value}) ->
-    case pgsql_binary:encode(Type, Value) of
+encode_parameter({Type, Value}, Codec) ->
+    case pgsql_binary:encode(Type, Value, Codec) of
         Bin when is_binary(Bin) -> {1, Bin};
         {error, unsupported}    -> encode_parameter(Value)
     end;
+encode_parameter(Value, _Codec) -> encode_parameter(Value).
+
 encode_parameter(A) when is_atom(A)    -> {0, encode_list(atom_to_list(A))};
 encode_parameter(B) when is_binary(B)  -> {0, <<(byte_size(B)):?int32, B/binary>>};
 encode_parameter(I) when is_integer(I) -> {0, encode_list(integer_to_list(I))};