22
33import static org .hypertrace .core .query .service .RowChunkingOperator .chunkRows ;
44
5+ import com .google .common .collect .ImmutableMap ;
56import io .grpc .Status ;
67import io .grpc .stub .ServerCallStreamObserver ;
78import io .grpc .stub .StreamObserver ;
9+ import io .micrometer .core .instrument .Counter ;
810import io .reactivex .rxjava3 .core .Maybe ;
911import io .reactivex .rxjava3 .core .Observable ;
1012import javax .inject .Inject ;
1618import org .hypertrace .core .query .service .api .QueryServiceGrpc ;
1719import org .hypertrace .core .query .service .api .ResultSetChunk ;
1820import org .hypertrace .core .query .service .validation .QueryValidator ;
21+ import org .hypertrace .core .serviceframework .metrics .PlatformMetricsRegistry ;
1922
2023@ Singleton
2124@ Slf4j
2225class QueryServiceImpl extends QueryServiceGrpc .QueryServiceImplBase {
26+
2327 private final RequestHandlerSelector handlerSelector ;
2428 private final QueryTransformationPipeline queryTransformationPipeline ;
2529 private final QueryValidator queryValidator ;
2630
31+ private Counter requestStatusErrorCounter ;
32+ private Counter requestStatusSuccessCounter ;
33+ private static final String SERVICE_REQUESTS_STATUS_COUNTER =
34+ "hypertrace.query.service.requests.status" ;
35+
2736 @ Inject
2837 public QueryServiceImpl (
2938 RequestHandlerSelector handlerSelector ,
@@ -32,6 +41,17 @@ public QueryServiceImpl(
3241 this .handlerSelector = handlerSelector ;
3342 this .queryTransformationPipeline = queryTransformationPipeline ;
3443 this .queryValidator = queryValidator ;
44+ initMetrics ();
45+ }
46+
47+ private void initMetrics () {
48+ requestStatusErrorCounter =
49+ PlatformMetricsRegistry .registerCounter (
50+ SERVICE_REQUESTS_STATUS_COUNTER , ImmutableMap .of ("error" , "true" ));
51+
52+ requestStatusSuccessCounter =
53+ PlatformMetricsRegistry .registerCounter (
54+ SERVICE_REQUESTS_STATUS_COUNTER , ImmutableMap .of ("error" , "false" ));
3555 }
3656
3757 @ Override
@@ -45,7 +65,12 @@ public void execute(
4565 () ->
4666 this .transformAndExecute (
4767 originalRequest , requestContext .getTenantId ().orElseThrow ())))
48- .doOnError (error -> log .error ("Query failed: {}" , originalRequest , error ))
68+ .doOnError (
69+ error -> {
70+ log .error ("Query failed: {}" , originalRequest , error );
71+ requestStatusErrorCounter .increment ();
72+ })
73+ .doOnComplete (() -> requestStatusSuccessCounter .increment ())
4974 .subscribe (
5075 new ServerCallStreamRxObserver <>(
5176 (ServerCallStreamObserver <ResultSetChunk >) callStreamObserver ));
0 commit comments