@@ -107,7 +107,7 @@ def test_propagation(enable_extended_tracing):
107107 gotNames = [span .name for span in from_inject_spans ]
108108 wantNames = [
109109 "CloudSpanner.CreateSession" ,
110- "CloudSpanner.Snapshot.execute_streaming_sql " ,
110+ "CloudSpanner.Snapshot.execute_sql " ,
111111 ]
112112 assert gotNames == wantNames
113113
@@ -216,8 +216,8 @@ def select_in_txn(txn):
216216 "CloudSpanner.Database.run_in_transaction" ,
217217 "CloudSpanner.CreateSession" ,
218218 "CloudSpanner.Session.run_in_transaction" ,
219- "CloudSpanner.Transaction.execute_streaming_sql " ,
220- "CloudSpanner.Transaction.execute_streaming_sql " ,
219+ "CloudSpanner.Transaction.execute_sql " ,
220+ "CloudSpanner.Transaction.execute_sql " ,
221221 "CloudSpanner.Transaction.commit" ,
222222 ]
223223
@@ -262,13 +262,206 @@ def select_in_txn(txn):
262262 ("CloudSpanner.Database.run_in_transaction" , codes .OK , None ),
263263 ("CloudSpanner.CreateSession" , codes .OK , None ),
264264 ("CloudSpanner.Session.run_in_transaction" , codes .OK , None ),
265- ("CloudSpanner.Transaction.execute_streaming_sql " , codes .OK , None ),
266- ("CloudSpanner.Transaction.execute_streaming_sql " , codes .OK , None ),
265+ ("CloudSpanner.Transaction.execute_sql " , codes .OK , None ),
266+ ("CloudSpanner.Transaction.execute_sql " , codes .OK , None ),
267267 ("CloudSpanner.Transaction.commit" , codes .OK , None ),
268268 ]
269269 assert got_statuses == want_statuses
270270
271271
272+ @pytest .mark .skipif (
273+ not _helpers .USE_EMULATOR ,
274+ reason = "Emulator needed to run this tests" ,
275+ )
276+ @pytest .mark .skipif (
277+ not HAS_OTEL_INSTALLED ,
278+ reason = "Tracing requires OpenTelemetry" ,
279+ )
280+ def test_transaction_update_implicit_begin_nested_inside_commit ():
281+ # Tests to ensure that transaction.commit() without a began transaction
282+ # has transaction.begin() inlined and nested under the commit span.
283+ from google .auth .credentials import AnonymousCredentials
284+ from opentelemetry .sdk .trace .export import SimpleSpanProcessor
285+ from opentelemetry .sdk .trace .export .in_memory_span_exporter import (
286+ InMemorySpanExporter ,
287+ )
288+ from opentelemetry .trace .status import StatusCode
289+ from opentelemetry .sdk .trace import TracerProvider
290+ from opentelemetry .sdk .trace .sampling import ALWAYS_ON
291+
292+ PROJECT = _helpers .EMULATOR_PROJECT
293+ CONFIGURATION_NAME = "config-name"
294+ INSTANCE_ID = _helpers .INSTANCE_ID
295+ DISPLAY_NAME = "display-name"
296+ DATABASE_ID = _helpers .unique_id ("temp_db" )
297+ NODE_COUNT = 5
298+ LABELS = {"test" : "true" }
299+
300+ def tx_update (txn ):
301+ txn .update (
302+ "Singers" ,
303+ columns = ["SingerId" , "FirstName" ],
304+ values = [["1" , "Bryan" ], ["2" , "Slash" ]],
305+ )
306+
307+ tracer_provider = TracerProvider (sampler = ALWAYS_ON )
308+ trace_exporter = InMemorySpanExporter ()
309+ tracer_provider .add_span_processor (SimpleSpanProcessor (trace_exporter ))
310+ observability_options = dict (
311+ tracer_provider = tracer_provider ,
312+ enable_extended_tracing = True ,
313+ )
314+
315+ client = Client (
316+ project = PROJECT ,
317+ observability_options = observability_options ,
318+ credentials = AnonymousCredentials (),
319+ )
320+
321+ instance = client .instance (
322+ INSTANCE_ID ,
323+ CONFIGURATION_NAME ,
324+ display_name = DISPLAY_NAME ,
325+ node_count = NODE_COUNT ,
326+ labels = LABELS ,
327+ )
328+
329+ try :
330+ instance .create ()
331+ except Exception :
332+ pass
333+
334+ db = instance .database (DATABASE_ID )
335+ try :
336+ db ._ddl_statements = [
337+ """CREATE TABLE Singers (
338+ SingerId INT64 NOT NULL,
339+ FirstName STRING(1024),
340+ LastName STRING(1024),
341+ SingerInfo BYTES(MAX),
342+ FullName STRING(2048) AS (
343+ ARRAY_TO_STRING([FirstName, LastName], " ")
344+ ) STORED
345+ ) PRIMARY KEY (SingerId)""" ,
346+ """CREATE TABLE Albums (
347+ SingerId INT64 NOT NULL,
348+ AlbumId INT64 NOT NULL,
349+ AlbumTitle STRING(MAX),
350+ MarketingBudget INT64,
351+ ) PRIMARY KEY (SingerId, AlbumId),
352+ INTERLEAVE IN PARENT Singers ON DELETE CASCADE""" ,
353+ ]
354+ db .create ()
355+ except Exception :
356+ pass
357+
358+ try :
359+ db .run_in_transaction (tx_update )
360+ except Exception :
361+ pass
362+
363+ span_list = trace_exporter .get_finished_spans ()
364+ # Sort the spans by their start time in the hierarchy.
365+ span_list = sorted (span_list , key = lambda span : span .start_time )
366+ got_span_names = [span .name for span in span_list ]
367+ want_span_names = [
368+ "CloudSpanner.Database.run_in_transaction" ,
369+ "CloudSpanner.CreateSession" ,
370+ "CloudSpanner.Session.run_in_transaction" ,
371+ "CloudSpanner.Transaction.commit" ,
372+ "CloudSpanner.Transaction.begin" ,
373+ ]
374+
375+ assert got_span_names == want_span_names
376+
377+ # Our object is to ensure that .begin() is a child of .commit()
378+ span_tx_begin = span_list [- 1 ]
379+ span_tx_commit = span_list [- 2 ]
380+ assert span_tx_begin .parent .span_id == span_tx_commit .context .span_id
381+
382+ got_events = []
383+ got_statuses = []
384+
385+ # Some event attributes are noisy/highly ephemeral
386+ # and can't be directly compared against.
387+ imprecise_event_attributes = ["exception.stacktrace" , "delay_seconds" , "cause" ]
388+ for span in span_list :
389+ got_statuses .append (
390+ (span .name , span .status .status_code , span .status .description )
391+ )
392+ for event in span .events :
393+ evt_attributes = event .attributes .copy ()
394+ for attr_name in imprecise_event_attributes :
395+ if attr_name in evt_attributes :
396+ evt_attributes [attr_name ] = "EPHEMERAL"
397+
398+ got_events .append ((event .name , evt_attributes ))
399+
400+ # Check for the series of events
401+ want_events = [
402+ ("Acquiring session" , {"kind" : "BurstyPool" }),
403+ ("Waiting for a session to become available" , {"kind" : "BurstyPool" }),
404+ ("No sessions available in pool. Creating session" , {"kind" : "BurstyPool" }),
405+ ("Creating Session" , {}),
406+ (
407+ "exception" ,
408+ {
409+ "exception.type" : "google.api_core.exceptions.NotFound" ,
410+ "exception.message" : "404 Table Singers: Row {Int64(1)} not found." ,
411+ "exception.stacktrace" : "EPHEMERAL" ,
412+ "exception.escaped" : "False" ,
413+ },
414+ ),
415+ (
416+ "Transaction.commit failed due to GoogleAPICallError, not retrying" ,
417+ {"attempt" : 1 },
418+ ),
419+ (
420+ "exception" ,
421+ {
422+ "exception.type" : "google.api_core.exceptions.NotFound" ,
423+ "exception.message" : "404 Table Singers: Row {Int64(1)} not found." ,
424+ "exception.stacktrace" : "EPHEMERAL" ,
425+ "exception.escaped" : "False" ,
426+ },
427+ ),
428+ ("Starting Commit" , {}),
429+ (
430+ "exception" ,
431+ {
432+ "exception.type" : "google.api_core.exceptions.NotFound" ,
433+ "exception.message" : "404 Table Singers: Row {Int64(1)} not found." ,
434+ "exception.stacktrace" : "EPHEMERAL" ,
435+ "exception.escaped" : "False" ,
436+ },
437+ ),
438+ ]
439+ assert got_events == want_events
440+
441+ # Check for the statues.
442+ codes = StatusCode
443+ want_statuses = [
444+ (
445+ "CloudSpanner.Database.run_in_transaction" ,
446+ codes .ERROR ,
447+ "NotFound: 404 Table Singers: Row {Int64(1)} not found." ,
448+ ),
449+ ("CloudSpanner.CreateSession" , codes .OK , None ),
450+ (
451+ "CloudSpanner.Session.run_in_transaction" ,
452+ codes .ERROR ,
453+ "NotFound: 404 Table Singers: Row {Int64(1)} not found." ,
454+ ),
455+ (
456+ "CloudSpanner.Transaction.commit" ,
457+ codes .ERROR ,
458+ "NotFound: 404 Table Singers: Row {Int64(1)} not found." ,
459+ ),
460+ ("CloudSpanner.Transaction.begin" , codes .OK , None ),
461+ ]
462+ assert got_statuses == want_statuses
463+
464+
272465def _make_credentials ():
273466 from google .auth .credentials import AnonymousCredentials
274467
0 commit comments