Browse Source

Merge remote-tracking branch 'epgsql/devel'

Conflicts:
	src/epgsql_sock.erl
Eugene Shubin 9 years ago
parent
commit
57991c23c1

+ 2 - 1
.travis.yml

@@ -3,9 +3,10 @@ otp_release:
   - R15B03
   - R16B03-1
   - 17.4
+  - 18.2
 before_script:
   - sudo apt-get update -qq
-  - sudo apt-get install postgresql-9.3-postgis-2.1 postgresql-contrib
+  - sudo apt-get install postgresql-9.3-postgis-2.1 postgresql-contrib-9.3
 addons:
   postgresql: "9.3"
 script:

+ 4 - 1
Makefile

@@ -3,12 +3,15 @@ LASTVERSION = $(shell git rev-parse HEAD )
 
 all: compile
 
-compile:
+compile: src/epgsql_errcodes.erl
 	@$(REBAR) compile
 
 clean:
 	@$(REBAR) clean
 
+src/epgsql_errcodes.erl:
+	./generate_errcodes_src.sh > src/epgsql_errcodes.erl
+
 # The INSERT is used to make sure the schema_version matches the tests
 # being run.
 create_testdbs:

+ 21 - 3
README.md

@@ -4,6 +4,23 @@ Asynchronous fork of [wg/epgsql](https://github.com/wg/epgsql) originally here:
 [mabrek/epgsql](https://github.com/mabrek/epgsql) and subsequently forked in order to
 provide a common fork for community development.
 
+## pgapp
+
+If you want to get up to speed quickly with code that lets you run
+Postgres queries, you might consider trying
+[epgsql/pgapp](https://github.com/epgsql/pgapp), which adds the
+following, on top of the epgsql driver:
+
+- A 'resource pool' (currently poolboy), which lets you decide how
+  many Postgres workers you want to utilize.
+- Resilience against the database going down or other problems.  The
+  pgapp code will keep trying to reconnect to the database, but will
+  not propagate the crash up the supervisor tree, so that, for
+  instance, your web site will stay up even if the database is down
+  for some reason.  Erlang's "let it crash" is a good idea, but
+  external resources going away might not be a good reason to crash
+  your entire system.
+
 ## Motivation
 
 When you need to execute several queries, it involves a number network
@@ -107,9 +124,9 @@ Asynchronous connect example (applies to **epgsqli** too):
 }).
 
 -type ok_reply(RowType) ::
-    {ok, Count :: non_neg_integer()} |                                                            % select
-    {ok, ColumnsDescription :: [#column{}], RowsValues :: [RowType]} |                            % update/insert
-    {ok, Count :: non_neg_integer(), ColumnsDescription :: [#column{}], RowsValues :: [RowType]}. % update/insert + returning
+    {ok, ColumnsDescription :: [#column{}], RowsValues :: [RowType]} |                            % select
+    {ok, Count :: non_neg_integer()} |                                                            % update/insert/delete
+    {ok, Count :: non_neg_integer(), ColumnsDescription :: [#column{}], RowsValues :: [RowType]}. % update/insert/delete + returning
 -type error_reply() :: {error, query_error()}.
 -type reply(RowType) :: ok_reply() | error_reply().
 
@@ -382,6 +399,7 @@ PG type       | Representation
   record      | `{int2, time, text, ...}` (decode only)
   point       |  `{10.2, 100.12}`
   int4range   | `[1,5)`
+  hstore      | `{list({binary(), binary() | null})}`
 
   `timestamp` and `timestamptz` parameters can take `erlang:now()` format: `{MegaSeconds, Seconds, MicroSeconds}`
 

+ 19 - 0
generate_errcodes_src.sh

@@ -0,0 +1,19 @@
+#!/usr/bin/env bash -e
+#
+# Used to generate epgsql_errcodes.erl
+#
+ERRFILE="https://raw.github.com/postgres/postgres/master/src/backend/utils/errcodes.txt"
+echo "%% DO NOT EDIT - AUTOGENERATED ON $(date)"
+echo "-module(epgsql_errcodes)."
+echo "-export([to_name/1])."
+echo
+wget -qO- "$ERRFILE" | awk '
+NF == 4 && \
+$1 ~ /[^\s]{5}/ && \
+$2 ~ /[EWS]/ \
+{
+    printf("to_name(<<\"%s\">>) -> %s;\n", $1, $4)
+}
+END {
+    print "to_name(_) -> undefined."
+}'

+ 1 - 0
include/epgsql.hrl

@@ -17,6 +17,7 @@
 -record(error, {
     severity :: fatal | error | atom(), %TODO: concretize
     code :: binary(),
+    codename :: atom(),
     message :: binary(),
     extra :: [{detail, binary()} | {hint, binary()} | {position, binary()}]
 }).

+ 1 - 0
setup_test_db.sh

@@ -12,6 +12,7 @@ initdb --locale en_US.UTF-8 datadir
 cat > datadir/postgresql.conf <<EOF
 ssl = on
 ssl_ca_file = 'root.crt'
+lc_messages = 'en_US.UTF-8'
 EOF
 
 cp test_data/epgsql.crt datadir/server.crt

+ 1 - 1
src/epgsql.app.src

@@ -1,6 +1,6 @@
 {application, epgsql,
  [{description, "PostgreSQL Client"},
-  {vsn, "3.1.1"},
+  {vsn, "3.3.0"},
   {modules, []},
   {registered, []},
   {applications, [kernel,

+ 18 - 7
src/epgsql.erl

@@ -18,6 +18,7 @@
          sync/1,
          cancel/1,
          update_type_cache/1,
+         update_type_cache/2,
          with_transaction/2,
          sync_on_error/2]).
 
@@ -52,14 +53,15 @@
         | calendar:time()                       %actualy, `Seconds' may be float()
         | calendar:datetime()
         | {calendar:time(), Days::non_neg_integer(), Months::non_neg_integer()}
+        | {list({binary(), binary() | null})}   % hstore
         | [bind_param()].                       %array (maybe nested)
 
 -type squery_row() :: {binary()}.
 -type equery_row() :: {bind_param()}.
 -type ok_reply(RowType) ::
-    {ok, Count :: non_neg_integer()} |                                                            % select
-    {ok, ColumnsDescription :: [#column{}], RowsValues :: [RowType]} |                            % update/insert
-    {ok, Count :: non_neg_integer(), ColumnsDescription :: [#column{}], RowsValues :: [RowType]}. % update/insert + returning
+    {ok, ColumnsDescription :: [#column{}], RowsValues :: [RowType]} |                            % select
+    {ok, Count :: non_neg_integer()} |                                                            % update/insert/delete
+    {ok, Count :: non_neg_integer(), ColumnsDescription :: [#column{}], RowsValues :: [RowType]}. % update/insert/delete + returning
 -type error_reply() :: {error, query_error()}.
 -type reply(RowType) :: ok_reply(RowType) | error_reply().
 
@@ -105,12 +107,22 @@ connect(C, Host, Username, Password, Opts) ->
 
 -spec update_type_cache(connection()) -> ok.
 update_type_cache(C) ->
-    DynamicTypes = [<<"hstore">>,<<"geometry">>],
+    update_type_cache(C, [<<"hstore">>,<<"geometry">>]).
+
+-spec update_type_cache(connection(), [binary()]) -> ok.
+update_type_cache(C, DynamicTypes) ->
     Query = "SELECT typname, oid::int4, typarray::int4"
             " FROM pg_type"
             " WHERE typname = ANY($1::varchar[])",
-    {ok, _, TypeInfos} = equery(C, Query, [DynamicTypes]),
-    ok = gen_server:call(C, {update_type_cache, TypeInfos}).
+    case equery(C, Query, [DynamicTypes]) of
+        {ok, _, TypeInfos} ->
+            ok = gen_server:call(C, {update_type_cache, TypeInfos});
+        {error, {error, error, _, _,
+                 <<"column \"typarray\" does not exist in pg_type">>, _}} ->
+            %% Do not fail connect if pg_type table in not in the expected
+            %% format. Known to happen for Redshift which is based on PG v8.0.2
+            ok
+    end.
 
 -spec close(connection()) -> ok.
 close(C) ->
@@ -253,4 +265,3 @@ sync_on_error(C, Error = {error, _}) ->
 
 sync_on_error(_C, R) ->
     R.
-

+ 1 - 0
src/epgsql_binary.erl

@@ -301,4 +301,5 @@ supports({array, varchar}) -> true;
 supports({array, uuid})   -> true;
 supports({array, cidr})   -> true;
 supports({array, inet})   -> true;
+supports({array, record}) -> true;
 supports(_Type)       -> false.

+ 239 - 0
src/epgsql_errcodes.erl

@@ -0,0 +1,239 @@
+%% DO NOT EDIT - AUTOGENERATED ON Mon  1 Jun 2015 14:42:37 BST
+-module(epgsql_errcodes).
+-export([to_name/1]).
+
+to_name(<<"00000">>) -> successful_completion;
+to_name(<<"01000">>) -> warning;
+to_name(<<"0100C">>) -> dynamic_result_sets_returned;
+to_name(<<"01008">>) -> implicit_zero_bit_padding;
+to_name(<<"01003">>) -> null_value_eliminated_in_set_function;
+to_name(<<"01007">>) -> privilege_not_granted;
+to_name(<<"01006">>) -> privilege_not_revoked;
+to_name(<<"01004">>) -> string_data_right_truncation;
+to_name(<<"01P01">>) -> deprecated_feature;
+to_name(<<"02000">>) -> no_data;
+to_name(<<"02001">>) -> no_additional_dynamic_result_sets_returned;
+to_name(<<"03000">>) -> sql_statement_not_yet_complete;
+to_name(<<"08000">>) -> connection_exception;
+to_name(<<"08003">>) -> connection_does_not_exist;
+to_name(<<"08006">>) -> connection_failure;
+to_name(<<"08001">>) -> sqlclient_unable_to_establish_sqlconnection;
+to_name(<<"08004">>) -> sqlserver_rejected_establishment_of_sqlconnection;
+to_name(<<"08007">>) -> transaction_resolution_unknown;
+to_name(<<"08P01">>) -> protocol_violation;
+to_name(<<"09000">>) -> triggered_action_exception;
+to_name(<<"0A000">>) -> feature_not_supported;
+to_name(<<"0B000">>) -> invalid_transaction_initiation;
+to_name(<<"0F000">>) -> locator_exception;
+to_name(<<"0F001">>) -> invalid_locator_specification;
+to_name(<<"0L000">>) -> invalid_grantor;
+to_name(<<"0LP01">>) -> invalid_grant_operation;
+to_name(<<"0P000">>) -> invalid_role_specification;
+to_name(<<"0Z000">>) -> diagnostics_exception;
+to_name(<<"0Z002">>) -> stacked_diagnostics_accessed_without_active_handler;
+to_name(<<"20000">>) -> case_not_found;
+to_name(<<"21000">>) -> cardinality_violation;
+to_name(<<"22000">>) -> data_exception;
+to_name(<<"2202E">>) -> array_subscript_error;
+to_name(<<"22021">>) -> character_not_in_repertoire;
+to_name(<<"22008">>) -> datetime_field_overflow;
+to_name(<<"22012">>) -> division_by_zero;
+to_name(<<"22005">>) -> error_in_assignment;
+to_name(<<"2200B">>) -> escape_character_conflict;
+to_name(<<"22022">>) -> indicator_overflow;
+to_name(<<"22015">>) -> interval_field_overflow;
+to_name(<<"2201E">>) -> invalid_argument_for_logarithm;
+to_name(<<"22014">>) -> invalid_argument_for_ntile_function;
+to_name(<<"22016">>) -> invalid_argument_for_nth_value_function;
+to_name(<<"2201F">>) -> invalid_argument_for_power_function;
+to_name(<<"2201G">>) -> invalid_argument_for_width_bucket_function;
+to_name(<<"22018">>) -> invalid_character_value_for_cast;
+to_name(<<"22007">>) -> invalid_datetime_format;
+to_name(<<"22019">>) -> invalid_escape_character;
+to_name(<<"2200D">>) -> invalid_escape_octet;
+to_name(<<"22025">>) -> invalid_escape_sequence;
+to_name(<<"22P06">>) -> nonstandard_use_of_escape_character;
+to_name(<<"22010">>) -> invalid_indicator_parameter_value;
+to_name(<<"22023">>) -> invalid_parameter_value;
+to_name(<<"2201B">>) -> invalid_regular_expression;
+to_name(<<"2201W">>) -> invalid_row_count_in_limit_clause;
+to_name(<<"2201X">>) -> invalid_row_count_in_result_offset_clause;
+to_name(<<"22009">>) -> invalid_time_zone_displacement_value;
+to_name(<<"2200C">>) -> invalid_use_of_escape_character;
+to_name(<<"2200G">>) -> most_specific_type_mismatch;
+to_name(<<"22004">>) -> null_value_not_allowed;
+to_name(<<"22002">>) -> null_value_no_indicator_parameter;
+to_name(<<"22003">>) -> numeric_value_out_of_range;
+to_name(<<"22026">>) -> string_data_length_mismatch;
+to_name(<<"22001">>) -> string_data_right_truncation;
+to_name(<<"22011">>) -> substring_error;
+to_name(<<"22027">>) -> trim_error;
+to_name(<<"22024">>) -> unterminated_c_string;
+to_name(<<"2200F">>) -> zero_length_character_string;
+to_name(<<"22P01">>) -> floating_point_exception;
+to_name(<<"22P02">>) -> invalid_text_representation;
+to_name(<<"22P03">>) -> invalid_binary_representation;
+to_name(<<"22P04">>) -> bad_copy_file_format;
+to_name(<<"22P05">>) -> untranslatable_character;
+to_name(<<"2200L">>) -> not_an_xml_document;
+to_name(<<"2200M">>) -> invalid_xml_document;
+to_name(<<"2200N">>) -> invalid_xml_content;
+to_name(<<"2200S">>) -> invalid_xml_comment;
+to_name(<<"2200T">>) -> invalid_xml_processing_instruction;
+to_name(<<"23000">>) -> integrity_constraint_violation;
+to_name(<<"23001">>) -> restrict_violation;
+to_name(<<"23502">>) -> not_null_violation;
+to_name(<<"23503">>) -> foreign_key_violation;
+to_name(<<"23505">>) -> unique_violation;
+to_name(<<"23514">>) -> check_violation;
+to_name(<<"23P01">>) -> exclusion_violation;
+to_name(<<"24000">>) -> invalid_cursor_state;
+to_name(<<"25000">>) -> invalid_transaction_state;
+to_name(<<"25001">>) -> active_sql_transaction;
+to_name(<<"25002">>) -> branch_transaction_already_active;
+to_name(<<"25008">>) -> held_cursor_requires_same_isolation_level;
+to_name(<<"25003">>) -> inappropriate_access_mode_for_branch_transaction;
+to_name(<<"25004">>) -> inappropriate_isolation_level_for_branch_transaction;
+to_name(<<"25005">>) -> no_active_sql_transaction_for_branch_transaction;
+to_name(<<"25006">>) -> read_only_sql_transaction;
+to_name(<<"25007">>) -> schema_and_data_statement_mixing_not_supported;
+to_name(<<"25P01">>) -> no_active_sql_transaction;
+to_name(<<"25P02">>) -> in_failed_sql_transaction;
+to_name(<<"26000">>) -> invalid_sql_statement_name;
+to_name(<<"27000">>) -> triggered_data_change_violation;
+to_name(<<"28000">>) -> invalid_authorization_specification;
+to_name(<<"28P01">>) -> invalid_password;
+to_name(<<"2B000">>) -> dependent_privilege_descriptors_still_exist;
+to_name(<<"2BP01">>) -> dependent_objects_still_exist;
+to_name(<<"2D000">>) -> invalid_transaction_termination;
+to_name(<<"2F000">>) -> sql_routine_exception;
+to_name(<<"2F005">>) -> function_executed_no_return_statement;
+to_name(<<"2F002">>) -> modifying_sql_data_not_permitted;
+to_name(<<"2F003">>) -> prohibited_sql_statement_attempted;
+to_name(<<"2F004">>) -> reading_sql_data_not_permitted;
+to_name(<<"34000">>) -> invalid_cursor_name;
+to_name(<<"38000">>) -> external_routine_exception;
+to_name(<<"38001">>) -> containing_sql_not_permitted;
+to_name(<<"38002">>) -> modifying_sql_data_not_permitted;
+to_name(<<"38003">>) -> prohibited_sql_statement_attempted;
+to_name(<<"38004">>) -> reading_sql_data_not_permitted;
+to_name(<<"39000">>) -> external_routine_invocation_exception;
+to_name(<<"39001">>) -> invalid_sqlstate_returned;
+to_name(<<"39004">>) -> null_value_not_allowed;
+to_name(<<"39P01">>) -> trigger_protocol_violated;
+to_name(<<"39P02">>) -> srf_protocol_violated;
+to_name(<<"39P03">>) -> event_trigger_protocol_violated;
+to_name(<<"3B000">>) -> savepoint_exception;
+to_name(<<"3B001">>) -> invalid_savepoint_specification;
+to_name(<<"3D000">>) -> invalid_catalog_name;
+to_name(<<"3F000">>) -> invalid_schema_name;
+to_name(<<"40000">>) -> transaction_rollback;
+to_name(<<"40002">>) -> transaction_integrity_constraint_violation;
+to_name(<<"40001">>) -> serialization_failure;
+to_name(<<"40003">>) -> statement_completion_unknown;
+to_name(<<"40P01">>) -> deadlock_detected;
+to_name(<<"42000">>) -> syntax_error_or_access_rule_violation;
+to_name(<<"42601">>) -> syntax_error;
+to_name(<<"42501">>) -> insufficient_privilege;
+to_name(<<"42846">>) -> cannot_coerce;
+to_name(<<"42803">>) -> grouping_error;
+to_name(<<"42P20">>) -> windowing_error;
+to_name(<<"42P19">>) -> invalid_recursion;
+to_name(<<"42830">>) -> invalid_foreign_key;
+to_name(<<"42602">>) -> invalid_name;
+to_name(<<"42622">>) -> name_too_long;
+to_name(<<"42939">>) -> reserved_name;
+to_name(<<"42804">>) -> datatype_mismatch;
+to_name(<<"42P18">>) -> indeterminate_datatype;
+to_name(<<"42P21">>) -> collation_mismatch;
+to_name(<<"42P22">>) -> indeterminate_collation;
+to_name(<<"42809">>) -> wrong_object_type;
+to_name(<<"42703">>) -> undefined_column;
+to_name(<<"42883">>) -> undefined_function;
+to_name(<<"42P01">>) -> undefined_table;
+to_name(<<"42P02">>) -> undefined_parameter;
+to_name(<<"42704">>) -> undefined_object;
+to_name(<<"42701">>) -> duplicate_column;
+to_name(<<"42P03">>) -> duplicate_cursor;
+to_name(<<"42P04">>) -> duplicate_database;
+to_name(<<"42723">>) -> duplicate_function;
+to_name(<<"42P05">>) -> duplicate_prepared_statement;
+to_name(<<"42P06">>) -> duplicate_schema;
+to_name(<<"42P07">>) -> duplicate_table;
+to_name(<<"42712">>) -> duplicate_alias;
+to_name(<<"42710">>) -> duplicate_object;
+to_name(<<"42702">>) -> ambiguous_column;
+to_name(<<"42725">>) -> ambiguous_function;
+to_name(<<"42P08">>) -> ambiguous_parameter;
+to_name(<<"42P09">>) -> ambiguous_alias;
+to_name(<<"42P10">>) -> invalid_column_reference;
+to_name(<<"42611">>) -> invalid_column_definition;
+to_name(<<"42P11">>) -> invalid_cursor_definition;
+to_name(<<"42P12">>) -> invalid_database_definition;
+to_name(<<"42P13">>) -> invalid_function_definition;
+to_name(<<"42P14">>) -> invalid_prepared_statement_definition;
+to_name(<<"42P15">>) -> invalid_schema_definition;
+to_name(<<"42P16">>) -> invalid_table_definition;
+to_name(<<"42P17">>) -> invalid_object_definition;
+to_name(<<"44000">>) -> with_check_option_violation;
+to_name(<<"53000">>) -> insufficient_resources;
+to_name(<<"53100">>) -> disk_full;
+to_name(<<"53200">>) -> out_of_memory;
+to_name(<<"53300">>) -> too_many_connections;
+to_name(<<"53400">>) -> configuration_limit_exceeded;
+to_name(<<"54000">>) -> program_limit_exceeded;
+to_name(<<"54001">>) -> statement_too_complex;
+to_name(<<"54011">>) -> too_many_columns;
+to_name(<<"54023">>) -> too_many_arguments;
+to_name(<<"55000">>) -> object_not_in_prerequisite_state;
+to_name(<<"55006">>) -> object_in_use;
+to_name(<<"55P02">>) -> cant_change_runtime_param;
+to_name(<<"55P03">>) -> lock_not_available;
+to_name(<<"57000">>) -> operator_intervention;
+to_name(<<"57014">>) -> query_canceled;
+to_name(<<"57P01">>) -> admin_shutdown;
+to_name(<<"57P02">>) -> crash_shutdown;
+to_name(<<"57P03">>) -> cannot_connect_now;
+to_name(<<"57P04">>) -> database_dropped;
+to_name(<<"58000">>) -> system_error;
+to_name(<<"58030">>) -> io_error;
+to_name(<<"58P01">>) -> undefined_file;
+to_name(<<"58P02">>) -> duplicate_file;
+to_name(<<"F0000">>) -> config_file_error;
+to_name(<<"F0001">>) -> lock_file_exists;
+to_name(<<"HV000">>) -> fdw_error;
+to_name(<<"HV005">>) -> fdw_column_name_not_found;
+to_name(<<"HV002">>) -> fdw_dynamic_parameter_value_needed;
+to_name(<<"HV010">>) -> fdw_function_sequence_error;
+to_name(<<"HV021">>) -> fdw_inconsistent_descriptor_information;
+to_name(<<"HV024">>) -> fdw_invalid_attribute_value;
+to_name(<<"HV007">>) -> fdw_invalid_column_name;
+to_name(<<"HV008">>) -> fdw_invalid_column_number;
+to_name(<<"HV004">>) -> fdw_invalid_data_type;
+to_name(<<"HV006">>) -> fdw_invalid_data_type_descriptors;
+to_name(<<"HV091">>) -> fdw_invalid_descriptor_field_identifier;
+to_name(<<"HV00B">>) -> fdw_invalid_handle;
+to_name(<<"HV00C">>) -> fdw_invalid_option_index;
+to_name(<<"HV00D">>) -> fdw_invalid_option_name;
+to_name(<<"HV090">>) -> fdw_invalid_string_length_or_buffer_length;
+to_name(<<"HV00A">>) -> fdw_invalid_string_format;
+to_name(<<"HV009">>) -> fdw_invalid_use_of_null_pointer;
+to_name(<<"HV014">>) -> fdw_too_many_handles;
+to_name(<<"HV001">>) -> fdw_out_of_memory;
+to_name(<<"HV00P">>) -> fdw_no_schemas;
+to_name(<<"HV00J">>) -> fdw_option_name_not_found;
+to_name(<<"HV00K">>) -> fdw_reply_handle;
+to_name(<<"HV00Q">>) -> fdw_schema_not_found;
+to_name(<<"HV00R">>) -> fdw_table_not_found;
+to_name(<<"HV00L">>) -> fdw_unable_to_create_execution;
+to_name(<<"HV00M">>) -> fdw_unable_to_create_reply;
+to_name(<<"HV00N">>) -> fdw_unable_to_establish_connection;
+to_name(<<"P0000">>) -> plpgsql_error;
+to_name(<<"P0001">>) -> raise_exception;
+to_name(<<"P0002">>) -> no_data_found;
+to_name(<<"P0003">>) -> too_many_rows;
+to_name(<<"P0004">>) -> assert_failure;
+to_name(<<"XX000">>) -> internal_error;
+to_name(<<"XX001">>) -> data_corrupted;
+to_name(<<"XX002">>) -> index_corrupted;
+to_name(_) -> undefined.

+ 57 - 35
src/epgsql_sock.erl

@@ -217,10 +217,12 @@ command({equery, Statement, Parameters}, #state{codec = Codec} = State) ->
     #statement{name = StatementName, columns = Columns} = Statement,
     Bin1 = epgsql_wire:encode_parameters(Parameters, Codec),
     Bin2 = epgsql_wire:encode_formats(Columns),
-    send(State, ?BIND, ["", 0, StatementName, 0, Bin1, Bin2]),
-    send(State, ?EXECUTE, ["", 0, <<0:?int32>>]),
-    send(State, ?CLOSE, [?PREPARED_STATEMENT, StatementName, 0]),
-    send(State, ?SYNC, []),
+    send_multi(State, [
+        {?BIND, ["", 0, StatementName, 0, Bin1, Bin2]},
+        {?EXECUTE, ["", 0, <<0:?int32>>]},
+        {?CLOSE, [?PREPARED_STATEMENT, StatementName, 0]},
+        {?SYNC, []}
+    ]),
     {noreply, State};
 
 command({prepared_query, Statement, Parameters}, #state{codec = Codec} = State) ->
@@ -234,9 +236,11 @@ command({prepared_query, Statement, Parameters}, #state{codec = Codec} = State)
 
 command({parse, Name, Sql, Types}, State) ->
     Bin = epgsql_wire:encode_types(Types, State#state.codec),
-    send(State, ?PARSE, [Name, 0, Sql, 0, Bin]),
-    send(State, ?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]),
-    send(State, ?FLUSH, []),
+    send_multi(State, [
+        {?PARSE, [Name, 0, Sql, 0, Bin]},
+        {?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]},
+        {?FLUSH, []}
+    ]),
     {noreply, State};
 
 command({bind, Statement, PortalName, Parameters}, #state{codec = Codec} = State) ->
@@ -244,43 +248,49 @@ command({bind, Statement, PortalName, Parameters}, #state{codec = Codec} = State
     Typed_Parameters = lists:zip(Types, Parameters),
     Bin1 = epgsql_wire:encode_parameters(Typed_Parameters, Codec),
     Bin2 = epgsql_wire:encode_formats(Columns),
-    send(State, ?BIND, [PortalName, 0, StatementName, 0, Bin1, Bin2]),
-    send(State, ?FLUSH, []),
+    send_multi(State, [
+        {?BIND, [PortalName, 0, StatementName, 0, Bin1, Bin2]},
+        {?FLUSH, []}
+    ]),
     {noreply, State};
 
 command({execute, _Statement, PortalName, MaxRows}, State) ->
-    send(State, ?EXECUTE, [PortalName, 0, <<MaxRows:?int32>>]),
-    send(State, ?FLUSH, []),
+    send_multi(State, [
+        {?EXECUTE, [PortalName, 0, <<MaxRows:?int32>>]},
+        {?FLUSH, []}
+    ]),
     {noreply, State};
 
-command({execute_batch, Batch}, State) ->
-    #state{mod = Mod, sock = Sock, codec = Codec} = State,
-    BindExecute =
-        lists:map(
-          fun({Statement, Parameters}) ->
+command({execute_batch, Batch}, #state{codec = Codec} = State) ->
+    Commands =
+        lists:foldr(
+          fun({Statement, Parameters}, Acc) ->
                   #statement{name = StatementName,
                              columns = Columns,
                              types = Types} = Statement,
                   Typed_Parameters = lists:zip(Types, Parameters),
                   Bin1 = epgsql_wire:encode_parameters(Typed_Parameters, Codec),
                   Bin2 = epgsql_wire:encode_formats(Columns),
-                  [epgsql_wire:encode(?BIND, [0, StatementName, 0,
-                                             Bin1, Bin2]),
-                   epgsql_wire:encode(?EXECUTE, [0, <<0:?int32>>])]
+                  [{?BIND, [0, StatementName, 0, Bin1, Bin2]},
+                   {?EXECUTE, [0, <<0:?int32>>]} | Acc]
           end,
+          [{?SYNC, []}],
           Batch),
-    Sync = epgsql_wire:encode(?SYNC, []),
-    do_send(Mod, Sock, [BindExecute, Sync]),
+    send_multi(State, Commands),
     {noreply, State};
 
 command({describe_statement, Name}, State) ->
-    send(State, ?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]),
-    send(State, ?FLUSH, []),
+    send_multi(State, [
+        {?DESCRIBE, [?PREPARED_STATEMENT, Name, 0]},
+        {?FLUSH, []}
+    ]),
     {noreply, State};
 
 command({describe_portal, Name}, State) ->
-    send(State, ?DESCRIBE, [?PORTAL, Name, 0]),
-    send(State, ?FLUSH, []),
+    send_multi(State, [
+        {?DESCRIBE, [?PORTAL, Name, 0]},
+        {?FLUSH, []}
+    ]),
     {noreply, State};
 
 command({close, Type, Name}, State) ->
@@ -288,8 +298,10 @@ command({close, Type, Name}, State) ->
         statement -> ?PREPARED_STATEMENT;
         portal    -> ?PORTAL
     end,
-    send(State, ?CLOSE, [Type2, Name, 0]),
-    send(State, ?FLUSH, []),
+    send_multi(State, [
+        {?CLOSE, [Type2, Name, 0]},
+        {?FLUSH, []}
+    ]),
     {noreply, State};
 
 command(sync, State) ->
@@ -325,6 +337,11 @@ send(#state{mod = Mod, sock = Sock}, Data) ->
 send(#state{mod = Mod, sock = Sock}, Type, Data) ->
     do_send(Mod, Sock, epgsql_wire:encode(Type, Data)).
 
+send_multi(#state{mod = Mod, sock = Sock}, List) ->
+    do_send(Mod, Sock, lists:map(fun({Type, Data}) ->
+        epgsql_wire:encode(Type, Data)
+    end, List)).
+
 do_send(gen_tcp, Sock, Bin) ->
     try erlang:port_command(Sock, Bin) of
         true ->
@@ -689,14 +706,19 @@ on_message({?READY_FOR_QUERY, <<Status:8>>}, State) ->
              end,
     {noreply, State2#state{txstatus = Status}};
 
-on_message(Error = {error, _}, State) ->
-    State2 = case command_tag(State) of
-                 C when C == squery; C == equery; C == execute_batch; C == prepared_query ->
-                     add_result(State, Error, Error);
-                 _ ->
-                     sync_required(finish(State, Error))
-             end,
-    {noreply, State2};
+on_message(Error = {error, Reason}, State) ->
+    case queue:is_empty(State#state.queue) of
+        true ->
+            {stop, {shutdown, Reason}, State};
+        false ->
+            State2 = case command_tag(State) of
+                C when C == squery; C == equery; C == execute_batch; C == prepared_query ->
+                    add_result(State, Error, Error);
+                _ ->
+                    sync_required(finish(State, Error))
+            end,
+            {noreply, State2}
+    end;
 
 %% NoticeResponse
 on_message({?NOTICE, Data}, State) ->

+ 2 - 0
src/epgsql_types.erl

@@ -91,6 +91,7 @@ oid2type(2280)  -> language_handler;
 oid2type(2281)  -> internal;
 oid2type(2282)  -> opaque;
 oid2type(2283)  -> anyelement;
+oid2type(2287)  -> {array, record};
 oid2type(2776)  -> anynonarray;
 oid2type(2950)  -> uuid;
 oid2type(2951)  -> {array, uuid};
@@ -187,6 +188,7 @@ type2oid(language_handler)      -> 2280;
 type2oid(internal)              -> 2281;
 type2oid(opaque)                -> 2282;
 type2oid(anyelement)            -> 2283;
+type2oid({array, record})       -> 2287;
 type2oid(anynonarray)           -> 2776;
 type2oid(uuid)                  -> 2950;
 type2oid({array, uuid})         -> 2951;

+ 4 - 1
src/epgsql_wire.erl

@@ -59,9 +59,12 @@ decode_fields(<<Type:8, Rest/binary>>, Acc) ->
 %% TODO add fields from http://www.postgresql.org/docs/9.0/interactive/protocol-error-fields.html
 decode_error(Bin) ->
     Fields = decode_fields(Bin),
+    ErrCode = proplists:get_value($C, Fields),
+    ErrName = epgsql_errcodes:to_name(ErrCode),
     Error = #error{
       severity = lower_atom(proplists:get_value($S, Fields)),
-      code     = proplists:get_value($C, Fields),
+      code     = ErrCode,
+      codename = ErrName,
       message  = proplists:get_value($M, Fields),
       extra    = decode_error_extra(Fields)},
     Error.

+ 51 - 6
test/epgsql_tests.erl

@@ -21,9 +21,13 @@
 -define(UUID3,
         <<198,188,155,66,149,151,17,227,138,98,112,24,139,130,16,73>>).
 
--define(TIMEOUT_ERROR,
-        {error,{error,error,<<"57014">>,
-                <<"canceling statement due to statement timeout">>,[]}}).
+-define(TIMEOUT_ERROR, {error, #error{
+        severity = error,
+        code = <<"57014">>,
+        codename = query_canceled,
+        message = <<"canceling statement due to statement timeout">>,
+        extra = []
+        }}).
 
 %% From uuid.erl in http://gitorious.org/avtobiff/erlang-uuid
 uuid_to_string(<<U0:32, U1:16, U2:16, U3:16, U4:48>>) ->
@@ -300,7 +304,7 @@ parse_and_close_test(Module) ->
       fun(C) ->
               Parse = fun() -> Module:parse(C, "test", "select * from test_table1", []) end,
               {ok, S} = Parse(),
-              {error, #error{code = <<"42P05">>}} = Parse(),
+              {error, #error{code = <<"42P05">>, codename = duplicate_prepared_statement}} = Parse(),
               Module:close(C, S),
               {ok, S} = Parse(),
               ok = Module:sync(C)
@@ -345,7 +349,7 @@ bind_and_close_test(Module) ->
       fun(C) ->
               {ok, S} = Module:parse(C, "select * from test_table1"),
               ok = Module:bind(C, S, "one", []),
-              {error, #error{code = <<"42P03">>}} = Module:bind(C, S, "one", []),
+              {error, #error{code = <<"42P03">>, codename = duplicate_cursor}} = Module:bind(C, S, "one", []),
               ok = Module:close(C, portal, "one"),
               ok = Module:bind(C, S, "one", []),
               ok = Module:sync(C)
@@ -357,7 +361,7 @@ execute_error_test(Module) ->
       fun(C) ->
           {ok, S} = Module:parse(C, "insert into test_table1 (id, value) values ($1, $2)"),
           ok = Module:bind(C, S, [1, <<"foo">>]),
-          {error, #error{code = <<"23505">>}} = Module:execute(C, S, 0),
+          {error, #error{code = <<"23505">>, codename = unique_violation}} = Module:execute(C, S, 0),
           {error, sync_required} = Module:bind(C, S, [3, <<"quux">>]),
           ok = Module:sync(C),
           ok = Module:bind(C, S, [3, <<"quux">>]),
@@ -607,6 +611,8 @@ array_type_test(Module) ->
       Module,
       fun(C) ->
           {ok, _, [{[1, 2]}]} = Module:equery(C, "select ($1::int[])[1:2]", [[1, 2, 3]]),
+          {ok, _, [{[{1, <<"one">>}, {2, <<"two">>}]}]} =
+              Module:equery(C, "select Array(select (id, value) from test_table1)", []),
           Select = fun(Type, A) ->
                        Query = "select $1::" ++ atom_to_list(Type) ++ "[]",
                        {ok, _Cols, [{A2}]} = Module:equery(C, Query, [A]),
@@ -639,6 +645,21 @@ array_type_test(Module) ->
           Select(inet, [{127,0,0,1}, {0,0,0,0,0,0,0,1}])
       end).
 
+custom_types_test(Module) ->
+    with_connection(
+      Module,
+      fun(C) ->
+              Module:squery(C, "drop table if exists t_foo;"),
+              Module:squery(C, "drop type foo;"),
+              {ok, [], []} = Module:squery(C, "create type foo as enum('foo', 'bar');"),
+              ok = epgsql:update_type_cache(C, [<<"foo">>]),
+              {ok, [], []} = Module:squery(C, "create table t_foo (col foo);"),
+              {ok, S} = Module:parse(C, "insert_foo", "insert into t_foo values ($1)", [foo]),
+              ok = Module:bind(C, S, ["bar"]),
+              {ok, 1} = Module:execute(C, S)
+
+      end).
+
 text_format_test(Module) ->
     with_connection(
       Module,
@@ -699,6 +720,30 @@ connection_closed_test(Module) ->
     end,
     flush().
 
+connection_closed_by_server_test(Module) ->
+    with_connection(Module,
+        fun(C1) ->
+            P = self(),
+            spawn_link(fun() ->
+                process_flag(trap_exit, true),
+                with_connection(Module,
+                    fun(C2) ->
+                        {ok, _, [{Pid}]} = Module:equery(C2, "select pg_backend_pid()"),
+                        % emulate of disconnection
+                        {ok, _, [{true}]} = Module:equery(C1,
+                            "select pg_terminate_backend($1)", [Pid]),
+                        receive
+                            {'EXIT', C2, {shutdown, #error{code = <<"57P01">>}}} ->
+                                P ! ok;
+                            Other ->
+                                ?debugFmt("Unexpected msg: ~p~n", [Other]),
+                                P ! error
+                        end
+                    end)
+            end),
+            receive ok -> ok end
+        end).
+
 active_connection_closed_test(Module) ->
     P = self(),
     F = fun() ->