Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion spannerlib/wrappers/spannerlib-ruby/Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,3 @@ task :compile do
end

task default: %i[compile spec rubocop]

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# frozen_string_literal: true

require_relative "ffi"
require_relative "rows"

class Connection
attr_reader :pool_id, :conn_id
Expand All @@ -35,8 +36,7 @@ def write_mutations(mutation_group)
else
mutation_group.to_s
end

SpannerLib.write_mutations(@pool_id, @conn_id, req_bytes)
SpannerLib.write_mutations(@pool_id, @conn_id, req_bytes, proto_klass: Google::Cloud::Spanner::V1::CommitResponse)
end

# Begin a read/write transaction on this connection. Accepts TransactionOptions proto or bytes.
Expand All @@ -52,7 +52,7 @@ def begin_transaction(transaction_options = nil)

# Commit the current transaction. Returns CommitResponse bytes or nil.
def commit
SpannerLib.commit(@pool_id, @conn_id)
SpannerLib.commit(@pool_id, @conn_id, proto_klass: Google::Cloud::Spanner::V1::CommitResponse)
end

# Rollback the current transaction.
Expand All @@ -68,7 +68,8 @@ def execute(request)
else
request.is_a?(String) ? request : request.to_s
end
SpannerLib.execute(@pool_id, @conn_id, bytes)
rows_id = SpannerLib.execute(@pool_id, @conn_id, bytes)
SpannerLib::Rows.new(self, rows_id)
end

# Execute batch DML/DDL request. Returns ExecuteBatchDmlResponse bytes (or nil).
Expand All @@ -78,24 +79,8 @@ def execute_batch(request)
else
request.is_a?(String) ? request : request.to_s
end
SpannerLib.execute_batch(@pool_id, @conn_id, bytes)
end

# Rows helpers — return raw message bytes (caller should parse them).
def metadata(rows_id)
SpannerLib.metadata(@pool_id, @conn_id, rows_id)
end

def next_rows(rows_id, num_rows, encoding = 0)
SpannerLib.next(@pool_id, @conn_id, rows_id, num_rows, encoding)
end

def result_set_stats(rows_id)
SpannerLib.result_set_stats(@pool_id, @conn_id, rows_id)
end

def close_rows(rows_id)
SpannerLib.close_rows(@pool_id, @conn_id, rows_id)
SpannerLib.execute_batch(@pool_id, @conn_id, bytes, proto_klass: Google::Cloud::Spanner::V1::ExecuteBatchDmlResponse)
end

# Closes this connection. Any active transaction on the connection is rolled back.
Expand Down
50 changes: 17 additions & 33 deletions spannerlib/wrappers/spannerlib-ruby/lib/spannerlib/ffi.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
require "google/rpc/status_pb"

require "ffi"
require_relative "message_handler"

module SpannerLib
extend FFI::Library
Expand Down Expand Up @@ -128,45 +129,25 @@ def self.ensure_release(message)
end
end

def self.handle_object_id_response(message, func_name)
def self.handle_object_id_response(message, _func_name)
ensure_release(message) do
if message[:code] != 0
error_msg = read_error_message(message)
raise "#{func_name} failed with code #{message[:code]}: #{error_msg}"
end
message[:objectId]
MessageHandler.new(message).object_id
end
end

def self.handle_status_response(message, func_name)
def self.handle_status_response(message, _func_name)
ensure_release(message) do
if message[:code] != 0
error_msg = read_error_message(message)
raise "#{func_name} failed with code #{message[:code]}: #{error_msg}"
end
MessageHandler.new(message).throw_if_error!
end
nil
end

# rubocop:disable Metrics/MethodLength
def self.handle_data_response(message, func_name)
def self.handle_data_response(message, _func_name, options = {})
proto_klass = options[:proto_klass]
ensure_release(message) do
if message[:code] != 0
error_msg = read_error_message(message)
raise "#{func_name} failed with code #{message[:code]}: #{error_msg}"
end

len = message[:length]
ptr = message[:pointer]

if len.positive? && !ptr.null?
ptr.read_bytes(len)
else
""
end
MessageHandler.new(message).data(proto_klass: proto_klass)
end
end
# rubocop:enable Metrics/MethodLength

# rubocop:disable Metrics/MethodLength
def self.read_error_message(message)
Expand All @@ -187,10 +168,11 @@ def self.read_error_message(message)
end
# rubocop:enable Metrics/MethodLength

def self.write_mutations(pool_id, conn_id, proto_bytes)
def self.write_mutations(pool_id, conn_id, proto_bytes, options = {})
proto_klass = options[:proto_klass]
with_gobytes(proto_bytes) do |gobytes|
message = WriteMutations(pool_id, conn_id, gobytes)
handle_data_response(message, "WriteMutations")
handle_data_response(message, "WriteMutations", proto_klass: proto_klass)
end
end

Expand All @@ -201,9 +183,10 @@ def self.begin_transaction(pool_id, conn_id, proto_bytes)
end
end

def self.commit(pool_id, conn_id)
def self.commit(pool_id, conn_id, options = {})
proto_klass = options[:proto_klass]
message = Commit(pool_id, conn_id)
handle_data_response(message, "Commit")
handle_data_response(message, "Commit", proto_klass: proto_klass)
end

def self.rollback(pool_id, conn_id)
Expand All @@ -218,10 +201,11 @@ def self.execute(pool_id, conn_id, proto_bytes)
end
end

def self.execute_batch(pool_id, conn_id, proto_bytes)
def self.execute_batch(pool_id, conn_id, proto_bytes, options = {})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a specific reason that this function accepts an options hash instead of a proto_klass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah i thought in future we might pass some extra arguments so i thought to keep this options hash.

proto_klass = options[:proto_klass]
with_gobytes(proto_bytes) do |gobytes|
message = ExecuteBatch(pool_id, conn_id, gobytes)
handle_data_response(message, "ExecuteBatch")
handle_data_response(message, "ExecuteBatch", proto_klass: proto_klass)
end
end

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

# lib/spannerlib/message_handler.rb

require "spannerlib/exceptions"

module SpannerLib
class MessageHandler
def initialize(message)
@message = message
end

def object_id
throw_if_error!
@message[:objectId]
end

# Returns the data payload from the message.
# If a proto_klass is provided, it decodes the bytes into a Protobuf object.
# Otherwise, it returns the raw bytes as a string.
def data(proto_klass: nil)
throw_if_error!

len = @message[:length]
ptr = @message[:pointer]

return (proto_klass ? proto_klass.new : "") unless len.positive? && !ptr.null?

bytes = ptr.read_string(len)

proto_klass ? proto_klass.decode(bytes) : bytes
end

def throw_if_error!
code = @message[:code]
return if code.zero?

error_msg = SpannerLib.read_error_message(@message)
raise SpannerLibException, "Call failed with code #{code}: #{error_msg}"
end
end
end
53 changes: 53 additions & 0 deletions spannerlib/wrappers/spannerlib-ruby/lib/spannerlib/rows.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# frozen_string_literal: true

module SpannerLib
class Rows
include Enumerable

attr_reader :id, :connection

def initialize(connection, rows_id)
@connection = connection
@id = rows_id
@closed = false
end

def each
return enum_for(:each) unless block_given?

while (row = self.next)
yield row
end
ensure
close
end

def next
return nil if @closed

row_data = SpannerLib.next(connection.pool_id, connection.conn_id, id, 1, 0)

if row_data.nil? || row_data.empty? || (row_data.respond_to?(:values) && row_data.values.empty?)
close
return nil
end

row_data
end

def metadata
SpannerLib.metadata(connection.pool_id, connection.conn_id, id)
end

def result_set_stats
SpannerLib.result_set_stats(connection.pool_id, connection.conn_id, id)
end

def close
return if @closed

SpannerLib.close_rows(connection.pool_id, connection.conn_id, id)
@closed = true
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# frozen_string_literal: true

require "spec_helper"
require "google/cloud/spanner/v1"

RSpec.describe "Batch API test", :integration do
before(:all) do
@emulator_host = ENV.fetch("SPANNER_EMULATOR_HOST", nil)
skip "SPANNER_EMULATOR_HOST not set" unless @emulator_host && !@emulator_host.empty?

require "spannerlib/pool"
@dsn = "projects/your-project-id/instances/test-instance/databases/test-database?autoConfigEmulator=true"

pool = Pool.create_pool(@dsn)
conn = pool.create_connection
ddl_batch_req = Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest.new(
statements: [
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(sql: "DROP TABLE IF EXISTS test_table"),
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(
sql: "CREATE TABLE test_table (id INT64 NOT NULL, name STRING(100)) PRIMARY KEY(id)"
)
]
)
conn.execute_batch(ddl_batch_req)
conn.close
pool.close
end

before do
@pool = Pool.create_pool(@dsn)
@conn = @pool.create_connection
delete_req = Google::Cloud::Spanner::V1::BatchWriteRequest::MutationGroup.new(
mutations: [
Google::Cloud::Spanner::V1::Mutation.new(
delete: Google::Cloud::Spanner::V1::Mutation::Delete.new(
table: "test_table",
key_set: Google::Cloud::Spanner::V1::KeySet.new(all: true)
)
)
]
)
@conn.write_mutations(delete_req)
end

after do
@conn.close
@pool.close
end

it "tests a batch DML request" do
dml_batch_req = Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest.new(
statements: [
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(
sql: "INSERT INTO test_table (id, name) VALUES (1, 'name1')"
),
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(
sql: "INSERT INTO test_table (id, name) VALUES (2, 'name2')"
),
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(
sql: "UPDATE test_table SET name='name3' WHERE id=1"
)
]
)
resp = @conn.execute_batch(dml_batch_req)
expect(resp.result_sets.length).to eq 3

select_req = Google::Cloud::Spanner::V1::ExecuteSqlRequest.new(
sql: "SELECT id, name FROM test_table ORDER BY id"
)
rows = @conn.execute(select_req)
all_rows = rows.map { |row_bytes| Google::Protobuf::ListValue.decode(row_bytes) }

expect(all_rows.length).to eq 2
expect(all_rows[0].values[0].string_value).to eq "1"
expect(all_rows[0].values[1].string_value).to eq "name3"
expect(all_rows[1].values[0].string_value).to eq "2"
expect(all_rows[1].values[1].string_value).to eq "name2"
end

it "tests a batch DDL request" do
ddl_batch_req = Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest.new(
statements: [
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(
sql: "DROP TABLE IF EXISTS test_table"
),
Google::Cloud::Spanner::V1::ExecuteBatchDmlRequest::Statement.new(
sql: "CREATE TABLE test_table (key INT64 NOT NULL, data STRING(MAX)) PRIMARY KEY(key)"
)
]
)

expect { @conn.execute_batch(ddl_batch_req) }.not_to raise_error

insert_req = Google::Cloud::Spanner::V1::BatchWriteRequest::MutationGroup.new(
mutations: [
Google::Cloud::Spanner::V1::Mutation.new(
insert: Google::Cloud::Spanner::V1::Mutation::Write.new(
table: "test_table",
columns: %w[key data],
values: [
Google::Protobuf::ListValue.new(values: [
Google::Protobuf::Value.new(string_value: "101"),
Google::Protobuf::Value.new(string_value: "VerificationData")
])
]
)
)
]
)
expect { @conn.write_mutations(insert_req) }.not_to raise_error
end
end
Loading
Loading