entry : serverResponseMap.entrySet()) {
+ if (entry.getValue().length == 0) {
+ throw new PinotException(ErrorCode.PINOT_INSUFFICIENT_SERVER_RESPONSE, String.format("Got empty response with from server: %s", entry.getKey().getShortHostName()));
+ }
+ }
+ }
+ return serverResponseMap;
+ }
+ catch (ExecutionException | InterruptedException e) {
+ Throwable err = e instanceof ExecutionException ? e.getCause() : e;
+ throw new PinotException(ErrorCode.PINOT_UNCLASSIFIED_ERROR, String.format("Caught exception while fetching responses for table: %s", tableNameWithType), err);
+ }
+ }
+
+ /**
+ * Deserialize the server responses, put the de-serialized data table into the data table map passed in, append
+ * processing exceptions to the processing exception list passed in.
+ * For hybrid use case, multiple responses might be from the same instance. Use response sequence to distinguish
+ * them.
+ *
+ * @param responseMap map from server to response.
+ * @param tableNameWithType table name with type suffix.
+ * @return dataTableMap map from server to data table.
+ */
+ private Map deserializeServerResponses(
+ Map responseMap,
+ String tableNameWithType)
+ {
+ Map dataTableMap = new HashMap<>();
+ for (Map.Entry entry : responseMap.entrySet()) {
+ ServerInstance serverInstance = entry.getKey();
+ byte[] value = entry.getValue();
+ if (value == null || value.length == 0) {
+ continue;
+ }
+ try {
+ dataTableMap.put(serverInstance, DataTableFactory.getDataTable(value));
+ }
+ catch (IOException e) {
+ throw new PinotException(ErrorCode.PINOT_UNCLASSIFIED_ERROR, String.format("Caught exceptions while deserializing response for table: %s from server: %s", tableNameWithType, serverInstance), e);
+ }
+ }
+ return dataTableMap;
+ }
+
+ private CompositeFuture routeScatterGather(ScatterGatherRequest scatterRequest, ScatterGatherStats scatterGatherStats)
+ {
+ try {
+ return this.scatterGatherer.scatterGather(scatterRequest, scatterGatherStats, true, brokerMetrics);
+ }
+ catch (InterruptedException e) {
+ throw new PinotException(ErrorCode.PINOT_UNCLASSIFIED_ERROR, format("Interrupted while sending request: %s", scatterRequest), e);
+ }
+ }
+
+ private static class ScatterGatherRequestWrapper
+ implements ScatterGatherRequest
+ {
+ private final BrokerRequest brokerRequest;
+ private final Map> routingTable;
+ private final long requestId;
+ private final long requestTimeoutMs;
+ private final String brokerId;
+
+ public ScatterGatherRequestWrapper(BrokerRequest request, Map> routingTable, long requestId, long requestTimeoutMs, String brokerId)
+ {
+ brokerRequest = request;
+ this.routingTable = routingTable;
+ this.requestId = requestId;
+
+ this.requestTimeoutMs = requestTimeoutMs;
+ this.brokerId = brokerId;
+ }
+
+ @Override
+ public Map> getRoutingTable()
+ {
+ return routingTable;
+ }
+
+ @Override
+ public byte[] getRequestForService(List segments)
+ {
+ InstanceRequest request = new InstanceRequest();
+ request.setRequestId(requestId);
+ request.setEnableTrace(brokerRequest.isEnableTrace());
+ request.setQuery(brokerRequest);
+ request.setSearchSegments(segments);
+ request.setBrokerId(brokerId);
+ return new SerDe(new TCompactProtocol.Factory()).serialize(request);
+ }
+
+ @Override
+ public long getRequestId()
+ {
+ return requestId;
+ }
+
+ @Override
+ public long getRequestTimeoutMs()
+ {
+ return requestTimeoutMs;
+ }
+
+ @Override
+ public BrokerRequest getBrokerRequest()
+ {
+ return brokerRequest;
+ }
+
+ @Override
+ public String toString()
+ {
+ if (routingTable == null) {
+ return null;
+ }
+
+ return Arrays.toString(routingTable.entrySet().toArray());
+ }
+ }
+}