1818
1919import static com .google .cloud .spanner .TransactionRunner .TransactionCallable ;
2020
21+ import com .google .api .gax .core .FixedCredentialsProvider ;
2122import com .google .api .gax .longrunning .OperationFuture ;
2223import com .google .api .gax .paging .Page ;
2324import com .google .api .gax .retrying .RetrySettings ;
7071import com .google .cloud .spanner .TimestampBound ;
7172import com .google .cloud .spanner .TransactionContext ;
7273import com .google .cloud .spanner .TransactionRunner ;
74+ import com .google .cloud .spanner .TransactionRunner .TransactionCallable ;
7375import com .google .cloud .spanner .Type ;
7476import com .google .cloud .spanner .Value ;
7577import com .google .cloud .spanner .encryption .CustomerManagedEncryption ;
7678import com .google .cloud .spanner .v1 .stub .SpannerStubSettings ;
79+ import com .google .cloud .trace .v1 .TraceServiceClient ;
80+ import com .google .cloud .trace .v1 .TraceServiceSettings ;
7781import com .google .common .base .Function ;
7882import com .google .common .base .Joiner ;
7983import com .google .common .base .Preconditions ;
8084import com .google .common .collect .Lists ;
8185import com .google .common .util .concurrent .ThreadFactoryBuilder ;
86+ import com .google .devtools .cloudtrace .v1 .GetTraceRequest ;
87+ import com .google .devtools .cloudtrace .v1 .Trace ;
88+ import com .google .devtools .cloudtrace .v1 .TraceSpan ;
8289import com .google .longrunning .Operation ;
8390import com .google .protobuf .ByteString ;
8491import com .google .protobuf .util .Timestamps ;
152159import com .google .spanner .v1 .TypeCode ;
153160import io .grpc .Status ;
154161import io .grpc .stub .StreamObserver ;
162+ import io .opentelemetry .api .trace .Span ;
163+ import io .opentelemetry .api .trace .Tracer ;
164+ import io .opentelemetry .context .Scope ;
155165import java .io .ByteArrayInputStream ;
156166import java .io .File ;
157167import java .io .IOException ;
166176import java .util .Objects ;
167177import java .util .concurrent .ExecutionException ;
168178import java .util .concurrent .Executor ;
179+ import java .util .concurrent .ExecutorService ;
169180import java .util .concurrent .Executors ;
181+ import java .util .concurrent .Future ;
170182import java .util .concurrent .TimeUnit ;
171183import java .util .logging .Level ;
172184import java .util .logging .Logger ;
@@ -332,24 +344,28 @@ public void startRWTransaction() throws Exception {
332344 // Try to commit
333345 return null ;
334346 };
347+ io .opentelemetry .context .Context context = io .opentelemetry .context .Context .current ();
335348 Runnable runnable =
336- () -> {
337- try {
338- runner =
339- optimistic
340- ? dbClient .readWriteTransaction (Options .optimisticLock ())
341- : dbClient .readWriteTransaction ();
342- LOGGER .log (Level .INFO , String .format ("Ready to run callable %s\n " , transactionSeed ));
343- runner .run (callable );
344- transactionSucceeded (runner .getCommitTimestamp ().toProto ());
345- } catch (SpannerException e ) {
346- LOGGER .log (
347- Level .WARNING ,
348- String .format ("Transaction runnable failed with exception %s\n " , e .getMessage ()),
349- e );
350- transactionFailed (e );
351- }
352- };
349+ context .wrap (
350+ () -> {
351+ try {
352+ runner =
353+ optimistic
354+ ? dbClient .readWriteTransaction (Options .optimisticLock ())
355+ : dbClient .readWriteTransaction ();
356+ LOGGER .log (
357+ Level .INFO , String .format ("Ready to run callable %s\n " , transactionSeed ));
358+ runner .run (callable );
359+ transactionSucceeded (runner .getCommitTimestamp ().toProto ());
360+ } catch (SpannerException e ) {
361+ LOGGER .log (
362+ Level .WARNING ,
363+ String .format (
364+ "Transaction runnable failed with exception %s\n " , e .getMessage ()),
365+ e );
366+ transactionFailed (e );
367+ }
368+ });
353369 LOGGER .log (
354370 Level .INFO ,
355371 String .format ("Callable and Runnable created, ready to execute %s\n " , transactionSeed ));
@@ -753,6 +769,11 @@ public synchronized void closeBatchTxn() throws SpannerException {
753769 Executors .newCachedThreadPool (
754770 new ThreadFactoryBuilder ().setNameFormat ("action-pool-%d" ).build ());
755771
772+ // Thread pool to verify end to end traces.
773+ private static final ExecutorService endToEndTracesThreadPool =
774+ Executors .newCachedThreadPool (
775+ new ThreadFactoryBuilder ().setNameFormat ("end-to-end-traces-pool-%d" ).build ());
776+
756777 private synchronized Spanner getClientWithTimeout (
757778 long timeoutSeconds , boolean useMultiplexedSession ) throws IOException {
758779 if (clientWithTimeout != null ) {
@@ -818,6 +839,8 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
818839 .setHost (HOST_PREFIX + WorkerProxy .spannerPort )
819840 .setCredentials (credentials )
820841 .setChannelProvider (channelProvider )
842+ .setEnableEndToEndTracing (true )
843+ .setOpenTelemetry (WorkerProxy .openTelemetrySdk )
821844 .setSessionPoolOption (sessionPoolOptions );
822845
823846 SpannerStubSettings .Builder stubSettingsBuilder =
@@ -841,6 +864,88 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
841864 return optionsBuilder .build ().getService ();
842865 }
843866
867+ private TraceServiceClient traceServiceClient ;
868+
869+ // Return the trace service client, create one if not exists.
870+ private synchronized TraceServiceClient getTraceServiceClient () throws IOException {
871+ if (traceServiceClient != null ) {
872+ return traceServiceClient ;
873+ }
874+ // Create a trace service client
875+ Credentials credentials ;
876+ if (WorkerProxy .serviceKeyFile .isEmpty ()) {
877+ credentials = NoCredentials .getInstance ();
878+ } else {
879+ credentials =
880+ GoogleCredentials .fromStream (
881+ new ByteArrayInputStream (
882+ FileUtils .readFileToByteArray (new File (WorkerProxy .serviceKeyFile ))),
883+ HTTP_TRANSPORT_FACTORY );
884+ }
885+
886+ TraceServiceSettings traceServiceSettings =
887+ TraceServiceSettings .newBuilder ()
888+ .setEndpoint (WorkerProxy .CLOUD_TRACE_ENDPOINT )
889+ .setCredentialsProvider (FixedCredentialsProvider .create (credentials ))
890+ .build ();
891+
892+ traceServiceClient = TraceServiceClient .create (traceServiceSettings );
893+ return traceServiceClient ;
894+ }
895+
896+ public Future <Boolean > getEndToEndTraceVerificationTask (String traceId ) {
897+ return endToEndTracesThreadPool .submit (
898+ () -> {
899+ try {
900+ // Wait for 10 seconds before verifying to ensure traces are exported.
901+ long sleepDuration = TimeUnit .SECONDS .toMillis (10 );
902+ LOGGER .log (
903+ Level .INFO ,
904+ String .format (
905+ "Sleeping for %d milliseconds before verifying end to end trace" ,
906+ sleepDuration ));
907+ Thread .sleep (sleepDuration );
908+ } catch (InterruptedException e ) {
909+ Thread .currentThread ().interrupt (); // Handle interruption
910+ LOGGER .log (Level .INFO , String .format ("Thread interrupted." ));
911+ return false ; // Return false if interrupted
912+ }
913+ return isExportedEndToEndTraceValid (traceId );
914+ });
915+ }
916+
917+ private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction" ;
918+ private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction" ;
919+
920+ /* Returns whether a exported trace is valid. */
921+ public boolean isExportedEndToEndTraceValid (String traceId ) {
922+ try {
923+ GetTraceRequest getTraceRequest =
924+ GetTraceRequest .newBuilder ()
925+ .setProjectId (WorkerProxy .PROJECT_ID )
926+ .setTraceId (traceId )
927+ .build ();
928+ Trace trace = getTraceServiceClient ().getTrace (getTraceRequest );
929+ boolean readWriteOrReadOnlyTxnPresent = false , spannerServerSideSpanPresent = false ;
930+ for (TraceSpan span : trace .getSpansList ()) {
931+ if (span .getName ().contains (READ_ONLY_TRANSACTION )
932+ || span .getName ().contains (READ_WRITE_TRANSACTION )) {
933+ readWriteOrReadOnlyTxnPresent = true ;
934+ }
935+ if (span .getName ().startsWith ("Spanner." )) {
936+ spannerServerSideSpanPresent = true ;
937+ }
938+ }
939+ if (readWriteOrReadOnlyTxnPresent && !spannerServerSideSpanPresent ) {
940+ return false ;
941+ }
942+ } catch (Exception e ) {
943+ LOGGER .log (Level .WARNING , "Failed to verify end to end trace." , e );
944+ return false ;
945+ }
946+ return true ;
947+ }
948+
844949 /** Handle actions. */
845950 public Status startHandlingRequest (
846951 SpannerAsyncActionRequest req , ExecutionFlowContext executionContext ) {
@@ -865,17 +970,20 @@ public Status startHandlingRequest(
865970 useMultiplexedSession = false ;
866971 }
867972
973+ io .opentelemetry .context .Context context = io .opentelemetry .context .Context .current ();
868974 actionThreadPool .execute (
869- () -> {
870- Status status =
871- executeAction (outcomeSender , action , dbPath , useMultiplexedSession , executionContext );
872- if (!status .isOk ()) {
873- LOGGER .log (
874- Level .WARNING ,
875- String .format ("Failed to execute action with error: %s\n %s" , status , action ));
876- executionContext .onError (status .getCause ());
877- }
878- });
975+ context .wrap (
976+ () -> {
977+ Status status =
978+ executeAction (
979+ outcomeSender , action , dbPath , useMultiplexedSession , executionContext );
980+ if (!status .isOk ()) {
981+ LOGGER .log (
982+ Level .WARNING ,
983+ String .format ("Failed to execute action with error: %s\n %s" , status , action ));
984+ executionContext .onError (status .getCause ());
985+ }
986+ }));
879987 return Status .OK ;
880988 }
881989
@@ -886,7 +994,10 @@ private Status executeAction(
886994 String dbPath ,
887995 boolean useMultiplexedSession ,
888996 ExecutionFlowContext executionContext ) {
889-
997+ Tracer tracer = WorkerProxy .openTelemetrySdk .getTracer (CloudClientExecutor .class .getName ());
998+ String actionType = action .getActionCase ().toString ();
999+ Span span = tracer .spanBuilder (String .format ("performaction_%s" , actionType )).startSpan ();
1000+ Scope scope = span .makeCurrent ();
8901001 try {
8911002 if (action .hasAdmin ()) {
8921003 return executeAdminAction (useMultiplexedSession , action .getAdmin (), outcomeSender );
@@ -959,11 +1070,15 @@ private Status executeAction(
9591070 ErrorCode .UNIMPLEMENTED , "Not implemented yet: \n " + action )));
9601071 }
9611072 } catch (Exception e ) {
1073+ span .recordException (e );
9621074 LOGGER .log (Level .WARNING , "Unexpected error: " + e .getMessage ());
9631075 return outcomeSender .finishWithError (
9641076 toStatus (
9651077 SpannerExceptionFactory .newSpannerException (
9661078 ErrorCode .INVALID_ARGUMENT , "Unexpected error: " + e .getMessage ())));
1079+ } finally {
1080+ scope .close ();
1081+ span .end ();
9671082 }
9681083 }
9691084
0 commit comments