@@ -699,7 +699,8 @@ def execute_partitioned_dml(
699699 )
700700
701701 def execute_pdml ():
702- with SessionCheckout (self ._pool ) as session :
702+ def do_execute_pdml (session , span ):
703+ add_span_event (span , "Starting BeginTransaction" )
703704 txn = api .begin_transaction (
704705 session = session .name , options = txn_options , metadata = metadata
705706 )
@@ -732,6 +733,13 @@ def execute_pdml():
732733
733734 return result_set .stats .row_count_lower_bound
734735
736+ with trace_call (
737+ "CloudSpanner.Database.execute_partitioned_pdml" ,
738+ observability_options = self .observability_options ,
739+ ) as span :
740+ with SessionCheckout (self ._pool ) as session :
741+ return do_execute_pdml (session , span )
742+
735743 return _retry_on_aborted (execute_pdml , DEFAULT_RETRY_BACKOFF )()
736744
737745 def session (self , labels = None , database_role = None ):
@@ -1349,6 +1357,10 @@ def to_dict(self):
13491357 "transaction_id" : snapshot ._transaction_id ,
13501358 }
13511359
1360+ @property
1361+ def observability_options (self ):
1362+ return getattr (self ._database , "observability_options" , {})
1363+
13521364 def _get_session (self ):
13531365 """Create session as needed.
13541366
@@ -1468,27 +1480,32 @@ def generate_read_batches(
14681480 mappings of information used perform actual partitioned reads via
14691481 :meth:`process_read_batch`.
14701482 """
1471- partitions = self ._get_snapshot ().partition_read (
1472- table = table ,
1473- columns = columns ,
1474- keyset = keyset ,
1475- index = index ,
1476- partition_size_bytes = partition_size_bytes ,
1477- max_partitions = max_partitions ,
1478- retry = retry ,
1479- timeout = timeout ,
1480- )
1483+ with trace_call (
1484+ f"CloudSpanner.{ type (self ).__name__ } .generate_read_batches" ,
1485+ extra_attributes = dict (table = table , columns = columns ),
1486+ observability_options = self .observability_options ,
1487+ ):
1488+ partitions = self ._get_snapshot ().partition_read (
1489+ table = table ,
1490+ columns = columns ,
1491+ keyset = keyset ,
1492+ index = index ,
1493+ partition_size_bytes = partition_size_bytes ,
1494+ max_partitions = max_partitions ,
1495+ retry = retry ,
1496+ timeout = timeout ,
1497+ )
14811498
1482- read_info = {
1483- "table" : table ,
1484- "columns" : columns ,
1485- "keyset" : keyset ._to_dict (),
1486- "index" : index ,
1487- "data_boost_enabled" : data_boost_enabled ,
1488- "directed_read_options" : directed_read_options ,
1489- }
1490- for partition in partitions :
1491- yield {"partition" : partition , "read" : read_info .copy ()}
1499+ read_info = {
1500+ "table" : table ,
1501+ "columns" : columns ,
1502+ "keyset" : keyset ._to_dict (),
1503+ "index" : index ,
1504+ "data_boost_enabled" : data_boost_enabled ,
1505+ "directed_read_options" : directed_read_options ,
1506+ }
1507+ for partition in partitions :
1508+ yield {"partition" : partition , "read" : read_info .copy ()}
14921509
14931510 def process_read_batch (
14941511 self ,
@@ -1514,12 +1531,17 @@ def process_read_batch(
15141531 :rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
15151532 :returns: a result set instance which can be used to consume rows.
15161533 """
1517- kwargs = copy .deepcopy (batch ["read" ])
1518- keyset_dict = kwargs .pop ("keyset" )
1519- kwargs ["keyset" ] = KeySet ._from_dict (keyset_dict )
1520- return self ._get_snapshot ().read (
1521- partition = batch ["partition" ], ** kwargs , retry = retry , timeout = timeout
1522- )
1534+ observability_options = self .observability_options or {}
1535+ with trace_call (
1536+ f"CloudSpanner.{ type (self ).__name__ } .process_read_batch" ,
1537+ observability_options = observability_options ,
1538+ ):
1539+ kwargs = copy .deepcopy (batch ["read" ])
1540+ keyset_dict = kwargs .pop ("keyset" )
1541+ kwargs ["keyset" ] = KeySet ._from_dict (keyset_dict )
1542+ return self ._get_snapshot ().read (
1543+ partition = batch ["partition" ], ** kwargs , retry = retry , timeout = timeout
1544+ )
15231545
15241546 def generate_query_batches (
15251547 self ,
@@ -1594,34 +1616,39 @@ def generate_query_batches(
15941616 mappings of information used perform actual partitioned reads via
15951617 :meth:`process_read_batch`.
15961618 """
1597- partitions = self ._get_snapshot ().partition_query (
1598- sql = sql ,
1599- params = params ,
1600- param_types = param_types ,
1601- partition_size_bytes = partition_size_bytes ,
1602- max_partitions = max_partitions ,
1603- retry = retry ,
1604- timeout = timeout ,
1605- )
1619+ with trace_call (
1620+ f"CloudSpanner.{ type (self ).__name__ } .generate_query_batches" ,
1621+ extra_attributes = dict (sql = sql ),
1622+ observability_options = self .observability_options ,
1623+ ):
1624+ partitions = self ._get_snapshot ().partition_query (
1625+ sql = sql ,
1626+ params = params ,
1627+ param_types = param_types ,
1628+ partition_size_bytes = partition_size_bytes ,
1629+ max_partitions = max_partitions ,
1630+ retry = retry ,
1631+ timeout = timeout ,
1632+ )
16061633
1607- query_info = {
1608- "sql" : sql ,
1609- "data_boost_enabled" : data_boost_enabled ,
1610- "directed_read_options" : directed_read_options ,
1611- }
1612- if params :
1613- query_info ["params" ] = params
1614- query_info ["param_types" ] = param_types
1615-
1616- # Query-level options have higher precedence than client-level and
1617- # environment-level options
1618- default_query_options = self ._database ._instance ._client ._query_options
1619- query_info ["query_options" ] = _merge_query_options (
1620- default_query_options , query_options
1621- )
1634+ query_info = {
1635+ "sql" : sql ,
1636+ "data_boost_enabled" : data_boost_enabled ,
1637+ "directed_read_options" : directed_read_options ,
1638+ }
1639+ if params :
1640+ query_info ["params" ] = params
1641+ query_info ["param_types" ] = param_types
1642+
1643+ # Query-level options have higher precedence than client-level and
1644+ # environment-level options
1645+ default_query_options = self ._database ._instance ._client ._query_options
1646+ query_info ["query_options" ] = _merge_query_options (
1647+ default_query_options , query_options
1648+ )
16221649
1623- for partition in partitions :
1624- yield {"partition" : partition , "query" : query_info }
1650+ for partition in partitions :
1651+ yield {"partition" : partition , "query" : query_info }
16251652
16261653 def process_query_batch (
16271654 self ,
@@ -1646,9 +1673,16 @@ def process_query_batch(
16461673 :rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
16471674 :returns: a result set instance which can be used to consume rows.
16481675 """
1649- return self ._get_snapshot ().execute_sql (
1650- partition = batch ["partition" ], ** batch ["query" ], retry = retry , timeout = timeout
1651- )
1676+ with trace_call (
1677+ f"CloudSpanner.{ type (self ).__name__ } .process_query_batch" ,
1678+ observability_options = self .observability_options ,
1679+ ):
1680+ return self ._get_snapshot ().execute_sql (
1681+ partition = batch ["partition" ],
1682+ ** batch ["query" ],
1683+ retry = retry ,
1684+ timeout = timeout ,
1685+ )
16521686
16531687 def run_partitioned_query (
16541688 self ,
@@ -1703,18 +1737,23 @@ def run_partitioned_query(
17031737 :rtype: :class:`~google.cloud.spanner_v1.merged_result_set.MergedResultSet`
17041738 :returns: a result set instance which can be used to consume rows.
17051739 """
1706- partitions = list (
1707- self .generate_query_batches (
1708- sql ,
1709- params ,
1710- param_types ,
1711- partition_size_bytes ,
1712- max_partitions ,
1713- query_options ,
1714- data_boost_enabled ,
1740+ with trace_call (
1741+ f"CloudSpanner.${ type (self ).__name__ } .run_partitioned_query" ,
1742+ extra_attributes = dict (sql = sql ),
1743+ observability_options = self .observability_options ,
1744+ ):
1745+ partitions = list (
1746+ self .generate_query_batches (
1747+ sql ,
1748+ params ,
1749+ param_types ,
1750+ partition_size_bytes ,
1751+ max_partitions ,
1752+ query_options ,
1753+ data_boost_enabled ,
1754+ )
17151755 )
1716- )
1717- return MergedResultSet (self , partitions , 0 )
1756+ return MergedResultSet (self , partitions , 0 )
17181757
17191758 def process (self , batch ):
17201759 """Process a single, partitioned query or read.
0 commit comments