Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OperatorInfo decoder when type is partitionedOutputInfo #45

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 162 additions & 28 deletions lib/presto/client/model_versions/0.149.rb
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def self.decode(hash)
when "groupid" then GroupIdNode
when "explainAnalyze" then ExplainAnalyzeNode
when "apply" then ApplyNode
when "assignUniqueId" then AssignUniqueId
end
if model_class
node = model_class.decode(hash)
Expand Down Expand Up @@ -179,60 +180,63 @@ def decode(hash)
end
obj = allocate
model_class = case hash["@type"]
when "CreateHandle" then OutputTableHandle
when "InsertHandle" then InsertTableHandle
when "DeleteHandle" then TableHandle
when "CreateHandle" then CreateHandle
when "InsertHandle" then InsertHandle
when "DeleteHandle" then DeleteHandle
end
if model_class
model_class.decode(hash)
end
obj.send(:initialize_struct,
hash["@type"],
model_class.decode(hash['handle'])
)
obj
end
end

class << DeleteHandle =
Base.new(:handle)
def decode(hash)
# Inner classes
module OperatorInfo
def self.decode(hash)
unless hash.is_a?(Hash)
raise TypeError, "Can't convert #{hash.class} to Hash"
end
obj = allocate
obj.send(:initialize_struct,
TableHandle.decode(hash['handle'])
)
obj
model_class = case hash["@type"]
when "exchangeClientStatus" then ExchangeClientStatus
when "localExchangeBuffer" then LocalExchangeBufferInfo
when "tableFinish" then TableFinishInfo
when "splitOperator" then SplitOperatorInfo
when "hashCollisionsInfo" then HashCollisionsInfo
when "partitionedOutput" then PartitionedOutputInfo
end
if model_class
model_class.decode(hash)
end
end
end

# Inner classes
class << Specification =
Base.new(:partition_by, :order_by, :orderings, :frame, :pages_added)
class << HashCollisionsInfo =
Base.new(:weighted_hash_collisions, :weighted_sum_squared_hash_collisions, :weighted_expectedHash_collisions)
def decode(hash)
unless hash.is_a?(Hash)
raise TypeError, "Can't convert #{hash.class} to Hash"
end
obj = allocate
obj.send(:initialize_struct,
hash["partitionBy"],
hash["orderBy"],
hash["orderings"],
hash["frame"],
hash["weighted_hash_collisions"],
hash["weighted_sum_squared_hash_collisions"],
hash["weighted_expectedHash_collisions"]
)
obj
end
end

class << ArgumentBinding =
Base.new(:column, :constant)
class << PartitionedOutputInfo =
Base.new(:rows_added, :pages_added, :output_buffer_peak_memory_usage)
def decode(hash)
unless hash.is_a?(Hash)
raise TypeError, "Can't convert #{hash.class} to Hash"
end
obj = allocate
obj.send(:initialize_struct,
hash["column"],
hash["constant"]
hash["rowsAdded"],
hash["pagesAdded"],
hash["outputBufferPeakMemoryUsage"]
)
obj
end
Expand Down Expand Up @@ -283,6 +287,21 @@ def decode(hash)
end
end

class << ArgumentBinding =
Base.new(:column, :constant)
def decode(hash)
unless hash.is_a?(Hash)
raise TypeError, "Can't convert #{hash.class} to Hash"
end
obj = allocate
obj.send(:initialize_struct,
hash["column"],
hash["constant"],
)
obj
end
end

class << BufferInfo =
Base.new(:buffer_id, :finished, :buffered_pages, :pages_sent, :page_buffer_info)
def decode(hash)
Expand Down Expand Up @@ -391,6 +410,34 @@ def decode(hash)
end
end

class << CreateHandle =
Base.new(:handle)
def decode(hash)
unless hash.is_a?(Hash)
raise TypeError, "Can't convert #{hash.class} to Hash"
end
obj = allocate
obj.send(:initialize_struct,
hash["handle"] && OutputTableHandle.decode(hash["handle"]),
)
obj
end
end

class << DeleteHandle =
Base.new(:handle)
def decode(hash)
unless hash.is_a?(Hash)
raise TypeError, "Can't convert #{hash.class} to Hash"
end
obj = allocate
obj.send(:initialize_struct,
hash["handle"] && TableHandle.decode(hash["handle"]),
)
obj
end
end

class << DeleteNode =
Base.new(:id, :source, :target, :row_id, :outputs)
def decode(hash)
Expand Down Expand Up @@ -506,6 +553,24 @@ def decode(hash)
end
end

class << ExchangeClientStatus =
Base.new(:buffered_bytes, :average_bytes_per_request, :buffered_pages, :no_more_locations, :page_buffer_client_statuses)
def decode(hash)
unless hash.is_a?(Hash)
raise TypeError, "Can't convert #{hash.class} to Hash"
end
obj = allocate
obj.send(:initialize_struct,
hash["bufferedBytes"],
hash["averageBytesPerRequest"],
hash["bufferedPages"],
hash["noMoreLocations"],
hash["pageBufferClientStatuses"] && hash["pageBufferClientStatuses"].map {|h| PageBufferClientStatus.decode(h) },
)
obj
end
end

class << ExchangeNode =
Base.new(:id, :type, :scope, :partitioning_scheme, :sources, :inputs)
def decode(hash)
Expand Down Expand Up @@ -687,6 +752,20 @@ def decode(hash)
end
end

class << InsertHandle =
Base.new(:handle)
def decode(hash)
unless hash.is_a?(Hash)
raise TypeError, "Can't convert #{hash.class} to Hash"
end
obj = allocate
obj.send(:initialize_struct,
hash["handle"] && InsertTableHandle.decode(hash["handle"]),
)
obj
end
end

class << InsertTableHandle =
Base.new(:connector_id, :transaction_handle, :connector_handle)
def decode(hash)
Expand Down Expand Up @@ -741,6 +820,21 @@ def decode(hash)
end
end

class << LocalExchangeBufferInfo =
Base.new(:buffered_bytes, :buffered_pages)
def decode(hash)
unless hash.is_a?(Hash)
raise TypeError, "Can't convert #{hash.class} to Hash"
end
obj = allocate
obj.send(:initialize_struct,
hash["bufferedBytes"],
hash["bufferedPages"],
)
obj
end
end

class << LongVariableConstraint =
Base.new(:name, :expression)
def decode(hash)
Expand Down Expand Up @@ -861,6 +955,28 @@ def decode(hash)
end
end

class << PageBufferClientStatus =
Base.new(:uri, :state, :last_update, :rows_received, :pages_received, :requests_scheduled, :requests_completed, :requests_failed, :http_request_state)
def decode(hash)
unless hash.is_a?(Hash)
raise TypeError, "Can't convert #{hash.class} to Hash"
end
obj = allocate
obj.send(:initialize_struct,
hash["uri"],
hash["state"],
hash["lastUpdate"],
hash["rowsReceived"],
hash["pagesReceived"],
hash["requestsScheduled"],
hash["requestsCompleted"],
hash["requestsFailed"],
hash["httpRequestState"],
)
obj
end
end

class << PageBufferInfo =
Base.new(:partition, :buffered_pages, :buffered_bytes, :rows_added, :pages_added)
def decode(hash)
Expand Down Expand Up @@ -1026,7 +1142,7 @@ def decode(hash)
end

class << QueryInfo =
Base.new(:query_id, :session, :state, :memory_pool, :scheduled, :self, :field_names, :query, :query_stats, :set_session_properties, :reset_session_properties, :added_prepared_statements, :deallocated_prepared_statements, :started_transaction_id, :clear_transaction_id, :update_type, :output_stage, :failure_info, :error_code, :inputs)
Base.new(:query_id, :session, :state, :memory_pool, :scheduled, :self, :field_names, :query, :query_stats, :set_session_properties, :reset_session_properties, :added_prepared_statements, :deallocated_prepared_statements, :started_transaction_id, :clear_transaction_id, :update_type, :output_stage, :failure_info, :error_code, :inputs, :final_query_info)
def decode(hash)
unless hash.is_a?(Hash)
raise TypeError, "Can't convert #{hash.class} to Hash"
Expand All @@ -1053,6 +1169,7 @@ def decode(hash)
hash["failureInfo"] && FailureInfo.decode(hash["failureInfo"]),
hash["errorCode"] && ErrorCode.decode(hash["errorCode"]),
hash["inputs"] && hash["inputs"].map {|h| Input.decode(h) },
hash["finalQueryInfo"],
)
obj
end
Expand Down Expand Up @@ -1288,6 +1405,23 @@ def decode(hash)
end
end

class << Specification =
Base.new(:partition_by, :order_by, :orderings, :frame)
def decode(hash)
unless hash.is_a?(Hash)
raise TypeError, "Can't convert #{hash.class} to Hash"
end
obj = allocate
obj.send(:initialize_struct,
hash["partitionBy"],
hash["orderBy"],
hash["orderings"] && Hash[hash["orderings"].to_a.map! {|k,v| [k, v.downcase.to_sym] }],
hash["frame"],
)
obj
end
end

class << StageInfo =
Base.new(:stage_id, :state, :self, :plan, :types, :stage_stats, :tasks, :sub_stages, :failure_cause)
def decode(hash)
Expand Down
Loading