Skip to content

Commit b0002bf

Browse files
Yorhelwill
Yorhel
authored andcommitted
Also add support for COPY in + fix close after partial read + remaining_row_size
1 parent 8c0019c commit b0002bf

File tree

6 files changed

+135
-85
lines changed

6 files changed

+135
-85
lines changed

spec/pg/connection_spec.cr

+18-26
Original file line numberDiff line numberDiff line change
@@ -142,40 +142,32 @@ describe PG, "#clear_time_zone_cache" do
142142
end
143143
end
144144

145-
describe PG, "COPY out" do
146-
it "supports COPY TO STDOUT data transfer" do
145+
describe PG, "COPY" do
146+
it "properly handles partial reads and consumes data on early close" do
147147
with_connection do |db|
148-
io = db.copy_out "COPY (SELECT 'text', NULL, 1) TO STDOUT"
149-
io.gets_to_end.should eq "text\t\\N\t1\n"
148+
io = db.exec_copy "COPY (VALUES (1), (333)) TO STDOUT"
149+
io.read_char.should eq '1'
150+
io.read_char.should eq '\n'
151+
io.read_char.should eq '3'
152+
io.read_char.should eq '3'
150153
io.close
151154
db.scalar("select 1").should eq(1)
152155
end
153156
end
154157

155-
it "propely consumes data on early close" do
158+
if "survives a COPY FROM STDIN and COPY TO STDOUT round-trip"
156159
with_connection do |db|
157-
io = db.copy_out "COPY (SELECT * FROM generate_series(1, 100) x) TO STDOUT"
158-
io.gets.should eq "1"
159-
io.gets.should eq "2"
160-
io.gets.should eq "3"
161-
io.close
162-
db.scalar("select 1").should eq(1)
163-
end
164-
end
160+
data = "123\tdata\n\\N\t\\N\n"
161+
db.exec("CREATE TEMPORARY TABLE IF NOT EXISTS copy_test (a int, b text)")
165162

166-
it "properly handles partial reads" do
167-
with_connection do |db|
168-
io = db.copy_out "COPY (VALUES (1), (333)) TO STDOUT"
169-
io.read_char.should eq '1'
170-
io.read_char.should eq '\n'
171-
io.read_char.should eq '3'
172-
io.read_char.should eq '3'
173-
io.read_char.should eq '3'
174-
io.read_char.should eq '\n'
175-
io.read_char.should eq nil
176-
io.read_char.should eq nil
177-
io.close
178-
db.scalar("select 1").should eq(1)
163+
wr = db.exec_copy "COPY copy_test FROM STDIN"
164+
wr << data
165+
wr.close
166+
167+
rd = db.exec_copy "COPY copy_test TO STDOUT"
168+
rd.gets_to_end.should eq data
169+
170+
db.exec("DROP TABLE copy_test")
179171
end
180172
end
181173
end

src/pg/connection.cr

+11-7
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,20 @@ module PG
3737
nil
3838
end
3939

40-
# Execute a "COPY .. TO STDOUT" query and return an IO object to read from.
41-
# The IO *must* be closed before using the connection again.
40+
# Execute a "COPY" query and return an IO object to read from or write to,
41+
# depending on the query.
4242
#
4343
# ```
44-
# io = conn.copy_out "COPY table TO STDOUT"
45-
# data = io.gets_to_end
46-
# io.close
44+
# data = conn.exec_copy("COPY table TO STDOUT").gets_to_end
4745
# ```
48-
def copy_out(query : String) : CopyOut
49-
CopyOut.new connection, query
46+
#
47+
# ```
48+
# writer = conn.exec_copy "COPY table FROM STDIN")
49+
# writer << data
50+
# writer.close
51+
# ```
52+
def exec_copy(query : String) : CopyResult
53+
CopyResult.new connection, query
5054
end
5155

5256
# Set the callback block for notices and errors.

src/pg/copy_out.cr

-52
This file was deleted.

src/pg/copy_result.cr

+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# IO object obtained through PG::Connection.exec_copy.
2+
class PG::CopyResult < IO
3+
getter? closed : Bool
4+
5+
def initialize(@connection : PQ::Connection, query : String)
6+
@connection.send_query_message query
7+
response = @connection.expect_frame PQ::Frame::CopyOutResponse | PQ::Frame::CopyInResponse
8+
9+
@reading = response.is_a? PQ::Frame::CopyOutResponse
10+
@frame_size = 0
11+
@end = false
12+
@closed = false
13+
end
14+
15+
private def read_final(done)
16+
return if @end
17+
@end = true
18+
19+
unless done
20+
@connection.skip_bytes @frame_size if @frame_size > 0
21+
22+
while @connection.read_next_copy_start
23+
size = @connection.read_i32 - 4
24+
@connection.skip_bytes size
25+
end
26+
end
27+
28+
@connection.expect_frame PQ::Frame::CommandComplete
29+
@connection.expect_frame PQ::Frame::ReadyForQuery
30+
end
31+
32+
# Returns the number of remaining bytes in the current row.
33+
# Returns 0 the are no more rows to be read.
34+
# This can be used to allocate the precise amount of memory to read a complete row.
35+
#
36+
# ```
37+
# size = io.remaining_row_size
38+
# if size != 0
39+
# row = Bytes.new(size)
40+
# io.read(row)
41+
# # Process the row.
42+
# end
43+
# ```
44+
def remaining_row_size : Int32
45+
raise "Can't read from a write-only PG::CopyResult" unless @reading
46+
check_open
47+
48+
return 0 if @end
49+
50+
if @frame_size == 0
51+
if @connection.read_next_copy_start
52+
@frame_size = @connection.read_i32 - 4
53+
else
54+
read_final true
55+
return 0
56+
end
57+
end
58+
59+
@frame_size
60+
end
61+
62+
def read(slice : Bytes) : Int32
63+
return 0 if slice.empty?
64+
65+
remaining = remaining_row_size
66+
return 0 if remaining == 0
67+
68+
max_bytes = slice.size > remaining ? remaining : slice.size
69+
bytes = @connection.read_direct(slice[0..max_bytes - 1])
70+
@frame_size -= bytes
71+
bytes
72+
end
73+
74+
def write(slice : Bytes) : Nil
75+
raise "Can't write to a read-only PG::CopyResult" if @reading
76+
@connection.send_copy_data_message slice
77+
end
78+
79+
def close : Nil
80+
return if @closed
81+
if @reading
82+
read_final false
83+
else
84+
@connection.send_copy_done_message
85+
@connection.expect_frame PQ::Frame::CommandComplete
86+
@connection.expect_frame PQ::Frame::ReadyForQuery
87+
end
88+
@closed = true
89+
end
90+
end

src/pq/connection.cr

+12
Original file line numberDiff line numberDiff line change
@@ -550,5 +550,17 @@ module PQ
550550
write_chr 'X'
551551
write_i32 4
552552
end
553+
554+
def send_copy_data_message(slice)
555+
write_chr 'd'
556+
write_i32 4 + slice.size
557+
soc.write slice
558+
end
559+
560+
def send_copy_done_message
561+
write_chr 'c'
562+
write_i32 4
563+
soc.flush
564+
end
553565
end
554566
end

src/pq/frame.cr

+4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ module PQ
2222
when 'K' then BackendKeyData
2323
when 'R' then Authentication
2424
when 'c' then CopyDone
25+
when 'G' then CopyInResponse
2526
when 'H' then CopyOutResponse
2627
else nil
2728
end
@@ -251,6 +252,9 @@ module PQ
251252
struct CopyDone < Frame
252253
end
253254

255+
struct CopyInResponse < Frame
256+
end
257+
254258
struct CopyOutResponse < Frame
255259
end
256260
end

0 commit comments

Comments
 (0)