Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cast decimals to ruby BigDecimals #48

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions lib/presto/client/statement_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#
module Presto::Client

require 'bigdecimal'
require 'json'
require 'msgpack'
require 'presto/client/models'
Expand Down Expand Up @@ -42,7 +43,8 @@ def initialize(faraday, query, options, next_uri=nil)

if next_uri
response = faraday_get_with_retry(next_uri)
@results = @models::QueryResults.decode(parse_body(response))
decoded = @models::QueryResults.decode(parse_body(response))
@results = cast_results(decoded)
else
post_query_request!
end
Expand All @@ -69,7 +71,8 @@ def post_query_request!
raise PrestoHttpError.new(response.status, "Failed to start query: #{response.body} (#{response.status})")
end

@results = decode_model(uri, parse_body(response), @models::QueryResults)
decoded = decode_model(uri, parse_body(response), @models::QueryResults)
@results = cast_results(decoded)
end

private :post_query_request!
Expand Down Expand Up @@ -113,7 +116,8 @@ def advance
uri = @results.next_uri

response = faraday_get_with_retry(uri)
@results = decode_model(uri, parse_body(response), @models::QueryResults)
decoded = decode_model(uri, parse_body(response), @models::QueryResults)
@results = cast_results(decoded)

return true
end
Expand All @@ -124,6 +128,26 @@ def query_info
decode_model(uri, parse_body(response), @models::QueryInfo)
end

def cast_results(query_results)
data = query_results.data
columns = query_results.columns
return query_results if data.nil? || columns.nil?

decimal_indices = columns.each_with_index.select { |c, i| c.type.start_with?('decimal(') }.map(&:last)
if decimal_indices.empty?
query_results
else
new_data = data.map do |row|
copy = row.dup
decimal_indices.each do |index|
copy[index] = BigDecimal(copy[index])
end
copy
end
query_results.class.new(query_results.to_h.merge(data: new_data))
end
end

def decode_model(uri, hash, body_class)
begin
body_class.decode(hash)
Expand Down
31 changes: 30 additions & 1 deletion spec/statement_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,36 @@
StatementClient.new(faraday, query, options)
end

it "returns results correctly" do
query = 'select 1 as i, 1.0 as d'
stub_request(:post, "localhost/v1/statement").
with(body: query,
headers: {
"User-Agent" => "presto-ruby/#{VERSION}",
"X-Presto-Catalog" => options[:catalog],
"X-Presto-Schema" => options[:schema],
"X-Presto-User" => options[:user],
"X-Presto-Language" => options[:language],
"X-Presto-Time-Zone" => options[:time_zone],
}).to_return(body: <<-JSON)
{"id":"queryid","infoUri":"http://localhost/ui/query.html?queryid","nextUri":"http://localhost/v1/statement/queryid/1","stats":{"state":"QUEUED","queued":true,"scheduled":false,"nodes":0,"totalSplits":0,"queuedSplits":0,"runningSplits":0,"completedSplits":0,"userTimeMillis":0,"cpuTimeMillis":0,"wallTimeMillis":0,"queuedTimeMillis":0,"elapsedTimeMillis":0,"processedRows":0,"processedBytes":0,"peakMemoryBytes":0}}
JSON
client = StatementClient.new(faraday, query, options)
client.current_results.data.should be_nil

stub_request(:get, "http://localhost/v1/statement/queryid/1").to_return(body: <<-JSON)
{"id":"queryid","infoUri":"http://localhost/ui/query.html?queryid","partialCancelUri":"http://192.168.128.77:8091/v1/stage/queryid.0","nextUri":"http://localhost/v1/statement/queryid/2","columns":[{"name":"i","type":"integer","typeSignature":{"rawType":"integer","typeArguments":[],"literalArguments":[],"arguments":[]}},{"name":"d","type":"decimal(2,1)","typeSignature":{"rawType":"decimal","typeArguments":[],"literalArguments":[],"arguments":[{"kind":"LONG_LITERAL","value":2},{"kind":"LONG_LITERAL","value":1}]}}],"data":[[1,"1.0"]],"stats":{"state":"RUNNING","queued":false,"scheduled":true,"nodes":1,"totalSplits":17,"queuedSplits":17,"runningSplits":0,"completedSplits":0,"userTimeMillis":0,"cpuTimeMillis":0,"wallTimeMillis":0,"queuedTimeMillis":1,"elapsedTimeMillis":121,"processedRows":0,"processedBytes":0,"peakMemoryBytes":0,"rootStage":{"stageId":"0","state":"RUNNING","done":false,"nodes":1,"totalSplits":17,"queuedSplits":17,"runningSplits":0,"completedSplits":0,"userTimeMillis":0,"cpuTimeMillis":0,"wallTimeMillis":0,"processedRows":0,"processedBytes":0,"subStages":[]},"progressPercentage":0.0}}
JSON
client.advance.should be_true
client.current_results.data.should == [[1, BigDecimal('1.0')]]

stub_request(:get, "http://localhost/v1/statement/queryid/2").to_return(body: <<-JSON)
{"id":"queryid","infoUri":"http://localhost/ui/query.html?queryid","columns":[{"name":"i","type":"integer","typeSignature":{"rawType":"integer","typeArguments":[],"literalArguments":[],"arguments":[]}},{"name":"d","type":"decimal(2,1)","typeSignature":{"rawType":"decimal","typeArguments":[],"literalArguments":[],"arguments":[{"kind":"LONG_LITERAL","value":2},{"kind":"LONG_LITERAL","value":1}]}}],"stats":{"state":"FINISHED","queued":false,"scheduled":true,"nodes":1,"totalSplits":17,"queuedSplits":0,"runningSplits":0,"completedSplits":17,"userTimeMillis":4,"cpuTimeMillis":5,"wallTimeMillis":109,"queuedTimeMillis":1,"elapsedTimeMillis":128,"processedRows":0,"processedBytes":0,"peakMemoryBytes":0,"rootStage":{"stageId":"0","state":"FINISHED","done":true,"nodes":1,"totalSplits":17,"queuedSplits":0,"runningSplits":0,"completedSplits":17,"userTimeMillis":4,"cpuTimeMillis":5,"wallTimeMillis":109,"processedRows":1,"processedBytes":0,"subStages":[]},"progressPercentage":100.0}}
JSON
client.advance.should be_true
client.current_results.data.should be_nil
end

let :response_json2 do
{
id: "queryid",
Expand Down Expand Up @@ -402,4 +432,3 @@
StatementClient.new(faraday, query, options)
end
end