@@ -699,7 +699,8 @@ def execute_partitioned_dml(
699699 )
700700
701701 def execute_pdml ():
702- with SessionCheckout (self ._pool ) as session :
702+ def do_execute_pdml (session , span ):
703+ add_span_event (span , "Starting BeginTransaction" )
703704 txn = api .begin_transaction (
704705 session = session .name , options = txn_options , metadata = metadata
705706 )
@@ -732,6 +733,13 @@ def execute_pdml():
732733
733734 return result_set .stats .row_count_lower_bound
734735
736+ with trace_call (
737+ "CloudSpanner.Database.execute_partitioned_pdml" ,
738+ observability_options = self .observability_options ,
739+ ) as span :
740+ with SessionCheckout (self ._pool ) as session :
741+ return do_execute_pdml (session , span )
742+
735743 return _retry_on_aborted (execute_pdml , DEFAULT_RETRY_BACKOFF )()
736744
737745 def session (self , labels = None , database_role = None ):
@@ -1357,6 +1365,10 @@ def to_dict(self):
13571365 "transaction_id" : snapshot ._transaction_id ,
13581366 }
13591367
1368+ @property
1369+ def observability_options (self ):
1370+ return getattr (self ._database , "observability_options" , {})
1371+
13601372 def _get_session (self ):
13611373 """Create session as needed.
13621374
@@ -1476,27 +1488,32 @@ def generate_read_batches(
14761488 mappings of information used perform actual partitioned reads via
14771489 :meth:`process_read_batch`.
14781490 """
1479- partitions = self ._get_snapshot ().partition_read (
1480- table = table ,
1481- columns = columns ,
1482- keyset = keyset ,
1483- index = index ,
1484- partition_size_bytes = partition_size_bytes ,
1485- max_partitions = max_partitions ,
1486- retry = retry ,
1487- timeout = timeout ,
1488- )
1491+ with trace_call (
1492+ f"CloudSpanner.{ type (self ).__name__ } .generate_read_batches" ,
1493+ extra_attributes = dict (table = table , columns = columns ),
1494+ observability_options = self .observability_options ,
1495+ ):
1496+ partitions = self ._get_snapshot ().partition_read (
1497+ table = table ,
1498+ columns = columns ,
1499+ keyset = keyset ,
1500+ index = index ,
1501+ partition_size_bytes = partition_size_bytes ,
1502+ max_partitions = max_partitions ,
1503+ retry = retry ,
1504+ timeout = timeout ,
1505+ )
14891506
1490- read_info = {
1491- "table" : table ,
1492- "columns" : columns ,
1493- "keyset" : keyset ._to_dict (),
1494- "index" : index ,
1495- "data_boost_enabled" : data_boost_enabled ,
1496- "directed_read_options" : directed_read_options ,
1497- }
1498- for partition in partitions :
1499- yield {"partition" : partition , "read" : read_info .copy ()}
1507+ read_info = {
1508+ "table" : table ,
1509+ "columns" : columns ,
1510+ "keyset" : keyset ._to_dict (),
1511+ "index" : index ,
1512+ "data_boost_enabled" : data_boost_enabled ,
1513+ "directed_read_options" : directed_read_options ,
1514+ }
1515+ for partition in partitions :
1516+ yield {"partition" : partition , "read" : read_info .copy ()}
15001517
15011518 def process_read_batch (
15021519 self ,
@@ -1522,12 +1539,17 @@ def process_read_batch(
15221539 :rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
15231540 :returns: a result set instance which can be used to consume rows.
15241541 """
1525- kwargs = copy .deepcopy (batch ["read" ])
1526- keyset_dict = kwargs .pop ("keyset" )
1527- kwargs ["keyset" ] = KeySet ._from_dict (keyset_dict )
1528- return self ._get_snapshot ().read (
1529- partition = batch ["partition" ], ** kwargs , retry = retry , timeout = timeout
1530- )
1542+ observability_options = self .observability_options or {}
1543+ with trace_call (
1544+ f"CloudSpanner.{ type (self ).__name__ } .process_read_batch" ,
1545+ observability_options = observability_options ,
1546+ ):
1547+ kwargs = copy .deepcopy (batch ["read" ])
1548+ keyset_dict = kwargs .pop ("keyset" )
1549+ kwargs ["keyset" ] = KeySet ._from_dict (keyset_dict )
1550+ return self ._get_snapshot ().read (
1551+ partition = batch ["partition" ], ** kwargs , retry = retry , timeout = timeout
1552+ )
15311553
15321554 def generate_query_batches (
15331555 self ,
@@ -1602,34 +1624,39 @@ def generate_query_batches(
16021624 mappings of information used perform actual partitioned reads via
16031625 :meth:`process_read_batch`.
16041626 """
1605- partitions = self ._get_snapshot ().partition_query (
1606- sql = sql ,
1607- params = params ,
1608- param_types = param_types ,
1609- partition_size_bytes = partition_size_bytes ,
1610- max_partitions = max_partitions ,
1611- retry = retry ,
1612- timeout = timeout ,
1613- )
1627+ with trace_call (
1628+ f"CloudSpanner.{ type (self ).__name__ } .generate_query_batches" ,
1629+ extra_attributes = dict (sql = sql ),
1630+ observability_options = self .observability_options ,
1631+ ):
1632+ partitions = self ._get_snapshot ().partition_query (
1633+ sql = sql ,
1634+ params = params ,
1635+ param_types = param_types ,
1636+ partition_size_bytes = partition_size_bytes ,
1637+ max_partitions = max_partitions ,
1638+ retry = retry ,
1639+ timeout = timeout ,
1640+ )
16141641
1615- query_info = {
1616- "sql" : sql ,
1617- "data_boost_enabled" : data_boost_enabled ,
1618- "directed_read_options" : directed_read_options ,
1619- }
1620- if params :
1621- query_info ["params" ] = params
1622- query_info ["param_types" ] = param_types
1623-
1624- # Query-level options have higher precedence than client-level and
1625- # environment-level options
1626- default_query_options = self ._database ._instance ._client ._query_options
1627- query_info ["query_options" ] = _merge_query_options (
1628- default_query_options , query_options
1629- )
1642+ query_info = {
1643+ "sql" : sql ,
1644+ "data_boost_enabled" : data_boost_enabled ,
1645+ "directed_read_options" : directed_read_options ,
1646+ }
1647+ if params :
1648+ query_info ["params" ] = params
1649+ query_info ["param_types" ] = param_types
1650+
1651+ # Query-level options have higher precedence than client-level and
1652+ # environment-level options
1653+ default_query_options = self ._database ._instance ._client ._query_options
1654+ query_info ["query_options" ] = _merge_query_options (
1655+ default_query_options , query_options
1656+ )
16301657
1631- for partition in partitions :
1632- yield {"partition" : partition , "query" : query_info }
1658+ for partition in partitions :
1659+ yield {"partition" : partition , "query" : query_info }
16331660
16341661 def process_query_batch (
16351662 self ,
@@ -1654,9 +1681,16 @@ def process_query_batch(
16541681 :rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
16551682 :returns: a result set instance which can be used to consume rows.
16561683 """
1657- return self ._get_snapshot ().execute_sql (
1658- partition = batch ["partition" ], ** batch ["query" ], retry = retry , timeout = timeout
1659- )
1684+ with trace_call (
1685+ f"CloudSpanner.{ type (self ).__name__ } .process_query_batch" ,
1686+ observability_options = self .observability_options ,
1687+ ):
1688+ return self ._get_snapshot ().execute_sql (
1689+ partition = batch ["partition" ],
1690+ ** batch ["query" ],
1691+ retry = retry ,
1692+ timeout = timeout ,
1693+ )
16601694
16611695 def run_partitioned_query (
16621696 self ,
@@ -1711,18 +1745,23 @@ def run_partitioned_query(
17111745 :rtype: :class:`~google.cloud.spanner_v1.merged_result_set.MergedResultSet`
17121746 :returns: a result set instance which can be used to consume rows.
17131747 """
1714- partitions = list (
1715- self .generate_query_batches (
1716- sql ,
1717- params ,
1718- param_types ,
1719- partition_size_bytes ,
1720- max_partitions ,
1721- query_options ,
1722- data_boost_enabled ,
1748+ with trace_call (
1749+ f"CloudSpanner.${ type (self ).__name__ } .run_partitioned_query" ,
1750+ extra_attributes = dict (sql = sql ),
1751+ observability_options = self .observability_options ,
1752+ ):
1753+ partitions = list (
1754+ self .generate_query_batches (
1755+ sql ,
1756+ params ,
1757+ param_types ,
1758+ partition_size_bytes ,
1759+ max_partitions ,
1760+ query_options ,
1761+ data_boost_enabled ,
1762+ )
17231763 )
1724- )
1725- return MergedResultSet (self , partitions , 0 )
1764+ return MergedResultSet (self , partitions , 0 )
17261765
17271766 def process (self , batch ):
17281767 """Process a single, partitioned query or read.
0 commit comments