Skip to content

Commit

Permalink
Fix invalid type error after failover (#555)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaodhib authored and josevalim committed Jul 27, 2021
1 parent d9a3795 commit 896782d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 26 deletions.
28 changes: 13 additions & 15 deletions lib/postgrex/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -371,18 +371,14 @@ defmodule Postgrex.Protocol do
lock_error(s, :execute, query)
end

defp handle_execute_result(%{types: types} = query, params, opts, %{types: types} = s) do
defp handle_execute_result(query, params, opts, s) do
if query_member?(s, query) do
rebind_execute(s, new_status(opts), query, params)
else
handle_prepare_execute(query, params, opts, s)
end
end

defp handle_execute_result(query, _, _, s) do
query_error(s, "query #{inspect(query)} has invalid types for the connection")
end

defp handle_execute_copy(query, params, opts, s) do
%{connection_id: connection_id} = s

Expand Down Expand Up @@ -1348,7 +1344,13 @@ defmodule Postgrex.Protocol do
]
end

defp recv_parse_describe(s, status, %Query{ref: nil} = query, buffer) do
defp recv_parse_describe(
%{types: protocol_types} = s,
status,
%Query{ref: ref, types: query_types} = query,
buffer
)
when ref == nil or protocol_types != query_types do
with {:ok, s, buffer} <- recv_parse(s, status, buffer),
{:ok, param_oids, result_oids, columns, s, buffer} <- recv_describe(s, status, buffer) do
describe(s, query, param_oids, result_oids, columns, buffer)
Expand Down Expand Up @@ -1813,10 +1815,6 @@ defmodule Postgrex.Protocol do

## execute

defp query_error(s, msg) do
{:error, Postgrex.QueryError.exception(msg), s}
end

defp lock_error(s, fun) do
msg = "connection is locked copying to or from the database and can not #{fun} transaction"

Expand Down Expand Up @@ -2114,18 +2112,14 @@ defmodule Postgrex.Protocol do
lock_error(s, :bind, query)
end

defp handle_bind(%Query{types: types} = query, params, res, opts, %{types: types} = s) do
defp handle_bind(query, params, res, opts, s) do
if query_member?(s, query) do
rebind(s, new_status(opts), query, params, res)
else
handle_prepare_bind(query, params, res, opts, s)
end
end

defp handle_bind(%Query{} = query, _, _, _, s) do
query_error(s, "query #{inspect(query)} has invalid types for the connection")
end

defp handle_prepare_bind(%Query{name: ""} = query, params, res, opts, s) do
status = new_status(opts)

Expand Down Expand Up @@ -3348,6 +3342,10 @@ defmodule Postgrex.Protocol do
defp query_member?(%{queries: nil}, _), do: false
defp query_member?(_, %{name: ""}), do: false

defp query_member?(%{types: protocol_types}, %Query{types: query_types})
when protocol_types != query_types,
do: false

defp query_member?(%{queries: queries}, %Query{name: name, ref: ref}) do
try do
:ets.lookup_element(queries, name, 2)
Expand Down
20 changes: 9 additions & 11 deletions test/custom_extensions_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule CustomExtensionsTest do
import Postgrex.TestHelper
import ExUnit.CaptureLog
alias Postgrex, as: P
alias Postgrex.Result

@types CustomExtensionsTypes

Expand Down Expand Up @@ -137,39 +138,36 @@ defmodule CustomExtensionsTest do
end) =~ "(RuntimeError) decode"
end

test "raise when executing prepared query on connection with different types", context do
test "execute prepared query on connection with different types", context do
query = prepare("S42", "SELECT 42")

opts = [types: Postgrex.DefaultTypes] ++ context[:options]
{:ok, pid2} = Postgrex.start_link(opts)

{:error, %Postgrex.QueryError{message: message}} = Postgrex.execute(pid2, query, [])
assert message =~ ~r"invalid types for the connection"
{:ok, %Postgrex.Query{}, %Result{rows: [[42]]}} = Postgrex.execute(pid2, query, [])
end

test "raise when streaming prepared query on connection with different types", context do
test "stream prepared query on connection with different types", context do
query = prepare("S42", "SELECT 42")

opts = [types: Postgrex.DefaultTypes] ++ context[:options]
{:ok, pid2} = Postgrex.start_link(opts)

Postgrex.transaction(pid2, fn conn ->
assert_raise Postgrex.QueryError, ~r"invalid types for the connection", fn ->
stream(query, []) |> Enum.take(1)
end
assert [%Result{rows: [[42]]}] = stream(query, []) |> Enum.take(1)
end)
end

test "raise when streaming prepared COPY FROM on connection with different types", context do
test "stream prepared COPY FROM on connection with different types", context do
query = prepare("copy", "COPY uniques FROM STDIN")

opts = [types: Postgrex.DefaultTypes] ++ context[:options]
{:ok, pid2} = Postgrex.start_link(opts)

Postgrex.transaction(pid2, fn conn ->
assert_raise Postgrex.QueryError, ~r"invalid types for the connection", fn ->
Enum.into(["1\n"], stream(query, []))
end
stream = stream(query, [])
assert Enum.into(["1\n"], stream) == stream
Postgrex.rollback(conn, :done)
end)
end

Expand Down

0 comments on commit 896782d

Please sign in to comment.