2626 _metadata_with_prefix ,
2727 _metadata_with_leader_aware_routing ,
2828)
29- from google .cloud .spanner_v1 ._opentelemetry_tracing import trace_call
29+ from google .cloud .spanner_v1 ._opentelemetry_tracing import (
30+ add_event_on_current_span ,
31+ trace_call ,
32+ trace_call_end_lazily ,
33+ )
3034from google .cloud .spanner_v1 import RequestOptions
3135from google .cloud .spanner_v1 ._helpers import _retry
3236from google .cloud .spanner_v1 ._helpers import _check_rst_stream_error
@@ -46,6 +50,12 @@ class _BatchBase(_SessionWrapper):
4650 def __init__ (self , session ):
4751 super (_BatchBase , self ).__init__ (session )
4852 self ._mutations = []
53+ self .__base_discard_span = trace_call_end_lazily (
54+ f"CloudSpanner.{ type (self ).__name__ } " ,
55+ self ._session ,
56+ None ,
57+ getattr (self ._session ._database , "observability_options" , None ),
58+ )
4959
5060 def _check_state (self ):
5161 """Helper for :meth:`commit` et al.
@@ -69,6 +79,10 @@ def insert(self, table, columns, values):
6979 :type values: list of lists
7080 :param values: Values to be modified.
7181 """
82+ add_event_on_current_span (
83+ "insert mutations added" ,
84+ dict (table = table , columns = columns ),
85+ )
7286 self ._mutations .append (Mutation (insert = _make_write_pb (table , columns , values )))
7387
7488 def update (self , table , columns , values ):
@@ -84,6 +98,10 @@ def update(self, table, columns, values):
8498 :param values: Values to be modified.
8599 """
86100 self ._mutations .append (Mutation (update = _make_write_pb (table , columns , values )))
101+ add_event_on_current_span (
102+ "update mutations added" ,
103+ dict (table = table , columns = columns ),
104+ )
87105
88106 def insert_or_update (self , table , columns , values ):
89107 """Insert/update one or more table rows.
@@ -100,6 +118,10 @@ def insert_or_update(self, table, columns, values):
100118 self ._mutations .append (
101119 Mutation (insert_or_update = _make_write_pb (table , columns , values ))
102120 )
121+ add_event_on_current_span (
122+ "insert_or_update mutations added" ,
123+ dict (table = table , columns = columns ),
124+ )
103125
104126 def replace (self , table , columns , values ):
105127 """Replace one or more table rows.
@@ -114,6 +136,10 @@ def replace(self, table, columns, values):
114136 :param values: Values to be modified.
115137 """
116138 self ._mutations .append (Mutation (replace = _make_write_pb (table , columns , values )))
139+ add_event_on_current_span (
140+ "replace mutations added" ,
141+ dict (table = table , columns = columns ),
142+ )
117143
118144 def delete (self , table , keyset ):
119145 """Delete one or more table rows.
@@ -126,6 +152,21 @@ def delete(self, table, keyset):
126152 """
127153 delete = Mutation .Delete (table = table , key_set = keyset ._to_pb ())
128154 self ._mutations .append (Mutation (delete = delete ))
155+ add_event_on_current_span (
156+ "delete mutations added" ,
157+ dict (table = table ),
158+ )
159+
160+ def _discard_on_end (self , exc_type = None , exc_val = None , exc_traceback = None ):
161+ if self .__base_discard_span :
162+ self .__base_discard_span (exc_type , exc_val , exc_traceback )
163+ self .__base_discard_span = None
164+
165+ def __exit__ (self , exc_type = None , exc_value = None , exc_traceback = None ):
166+ self ._discard_on_end (exc_type , exc_val , exc_traceback )
167+
168+ def __enter__ (self ):
169+ return self
129170
130171
131172class Batch (_BatchBase ):
@@ -207,7 +248,7 @@ def commit(
207248 )
208249 observability_options = getattr (database , "observability_options" , None )
209250 with trace_call (
210- "CloudSpanner.Commit " ,
251+ "CloudSpanner.Batch.commit " ,
211252 self ._session ,
212253 trace_attributes ,
213254 observability_options = observability_options ,
@@ -223,18 +264,31 @@ def commit(
223264 )
224265 self .committed = response .commit_timestamp
225266 self .commit_stats = response .commit_stats
267+ self ._discard_on_end ()
226268 return self .committed
227269
228270 def __enter__ (self ):
229271 """Begin ``with`` block."""
230272 self ._check_state ()
273+ observability_options = getattr (
274+ self ._session ._database , "observability_options" , None
275+ )
276+ self .__discard_span = trace_call_end_lazily (
277+ "CloudSpanner.Batch" ,
278+ self ._session ,
279+ observability_options = observability_options ,
280+ )
231281
232282 return self
233283
234284 def __exit__ (self , exc_type , exc_val , exc_tb ):
235285 """End ``with`` block."""
236286 if exc_type is None :
237287 self .commit ()
288+ if self .__discard_span :
289+ self .__discard_span (exc_type , exc_val , exc_tb )
290+ self .__discard_span = None
291+ self ._discard_on_end ()
238292
239293
240294class MutationGroup (_BatchBase ):
@@ -326,7 +380,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
326380 )
327381 observability_options = getattr (database , "observability_options" , None )
328382 with trace_call (
329- "CloudSpanner.BatchWrite " ,
383+ "CloudSpanner.batch_write " ,
330384 self ._session ,
331385 trace_attributes ,
332386 observability_options = observability_options ,
0 commit comments