Skip to content

Commit

Permalink
[pinpoint-apm#9932] Replace DefaultFuture with CompletableFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed May 15, 2023
1 parent 97e287f commit 92a1bee
Show file tree
Hide file tree
Showing 29 changed files with 157 additions and 721 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,12 @@ private CompletableFuture<ResponseMessage> request0(GrpcAgentConnection grpcAgen
try {
responseFuture.get(3000, TimeUnit.MILLISECONDS);
return responseFuture;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PinpointSocketException(e);
} catch (ExecutionException e) {
throw new PinpointSocketException(e.getCause());
} catch (TimeoutException e) {
throw new PinpointSocketException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.navercorp.pinpoint.grpc.trace.PCmdRequest;
import com.navercorp.pinpoint.grpc.trace.PCmdResponse;
import com.navercorp.pinpoint.profiler.context.thrift.CommandGrpcToThriftMessageConverter;
import com.navercorp.pinpoint.rpc.DefaultFuture;
import com.navercorp.pinpoint.rpc.PinpointSocketException;
import com.navercorp.pinpoint.rpc.ResponseMessage;
import com.navercorp.pinpoint.rpc.client.RequestManager;
Expand All @@ -44,7 +43,6 @@
import com.navercorp.pinpoint.rpc.stream.StreamChannel;
import com.navercorp.pinpoint.rpc.stream.StreamChannelRepository;
import com.navercorp.pinpoint.rpc.stream.StreamException;
import com.navercorp.pinpoint.rpc.util.Futures;
import com.navercorp.pinpoint.thrift.dto.command.TRouteResult;
import com.navercorp.pinpoint.thrift.io.CommandHeaderTBaseSerializerFactory;
import com.navercorp.pinpoint.thrift.util.SerializationUtils;
Expand Down Expand Up @@ -145,10 +143,9 @@ public CompletableFuture<ResponseMessage> request(GeneratedMessageV3 message) {
return createFailedFuture(new PinpointSocketException(TRouteResult.NOT_SUPPORTED_REQUEST.name()));
}

DefaultFuture<ResponseMessage> future = requestManager.register(request.getRequestId());
CompletableFuture<ResponseMessage> future = requestManager.register(request.getRequestId());
requestObserver.onNext(request);

return Futures.toCompletableFuture(future);
return future;
}

// 1st message : client(collector) -> server(agent)
Expand Down Expand Up @@ -347,9 +344,9 @@ public void close(SocketStateCode toState) {
}

private void setFailMessageToFuture(int responseId, String message) {
DefaultFuture<ResponseMessage> future = requestManager.removeMessageFuture(responseId);
CompletableFuture<ResponseMessage> future = requestManager.removeMessageFuture(responseId);
if (future != null) {
future.setFailure(new PinpointSocketException(message));
future.completeExceptionally(new PinpointSocketException(message));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@
import java.util.List;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.BiConsumer;

/**
* @author Jongho Moon
Expand Down Expand Up @@ -140,7 +139,7 @@ public boolean request(MetaDataType data, int retry) {
}

@Override
public boolean request(MetaDataType data, Consumer<CompletableFuture<ResponseMessage>> listener) {
public boolean request(MetaDataType data, BiConsumer<ResponseMessage, Throwable> listener) {
addData(data);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -176,9 +178,9 @@ private boolean sendAgentInfo() {
AgentInfo agentInfo = agentInfoFactory.createAgentInfo();

logger.info("Sending AgentInfo {}", agentInfo);
ResponseFutureListener<ResponseMessage> listener = new ResponseFutureListener<>();
ResponseFutureListener<ResponseMessage, Throwable> listener = new ResponseFutureListener<>();
dataSender.request(agentInfo, listener);
ResponseMessage responseMessage = listener.getResponseFeture().get(3000, TimeUnit.MILLISECONDS);
ResponseMessage responseMessage = listener.getResponseFuture().get(3000, TimeUnit.MILLISECONDS);
if (responseMessage == null) {
logger.warn("result not set");
return false;
Expand All @@ -188,11 +190,20 @@ private boolean sendAgentInfo() {
logger.warn("request unsuccessful. Cause : {}", result.getMessage());
}
return result.isSuccess();
} catch (Throwable th) {
logger.warn("failed to send agent info", th);
} catch (ExecutionException ex) {
logError(ex.getCause());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logError(ex);
} catch (TimeoutException ex) {
logError(ex);
}
return false;
}

private void logError(Throwable cause) {
logger.warn("failed to send agent info", cause);
}
}

public static class Builder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package com.navercorp.pinpoint.profiler;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.BiConsumer;

public class ResponseFutureListener<T> implements Consumer<CompletableFuture<T>> {
public class ResponseFutureListener<T, U extends Throwable> implements BiConsumer<T, U> {

private final CompletableFuture<T> future;

Expand All @@ -28,22 +28,15 @@ public ResponseFutureListener() {
}

@Override
public void accept(CompletableFuture<T> cf) {
if (!cf.isDone()) {
this.future.completeExceptionally(new IllegalStateException("ResponseMessage future is not complete"));
return;
public void accept(T message, U th) {
if (message != null) {
future.complete(message);
} else {
future.completeExceptionally(th);
}

cf.whenComplete((responseMessage, throwable) -> {
if (throwable != null) {
future.completeExceptionally(throwable);
} else {
future.complete(responseMessage);
}
});
}

public CompletableFuture<T> getResponseFeture() {
public CompletableFuture<T> getResponseFuture() {
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

import com.navercorp.pinpoint.rpc.ResponseMessage;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.BiConsumer;


/**
Expand Down Expand Up @@ -56,7 +55,7 @@ public boolean request(T data, int retry) {


@Override
public boolean request(T data, Consumer<CompletableFuture<ResponseMessage>> listener) {
public boolean request(T data, BiConsumer<ResponseMessage, Throwable> listener) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

import com.navercorp.pinpoint.rpc.ResponseMessage;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.BiConsumer;


/**
Expand All @@ -31,7 +30,7 @@ public interface EnhancedDataSender<T> extends DataSender<T> {

boolean request(T data, int retry);

boolean request(T data, Consumer<CompletableFuture<ResponseMessage>> listener);
boolean request(T data, BiConsumer<ResponseMessage, Throwable> listener);


}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.BiConsumer;


/**
Expand Down Expand Up @@ -58,7 +57,7 @@ public boolean request(T data, int retry) {


@Override
public boolean request(T data, Consumer<CompletableFuture<ResponseMessage>> listener) {
public boolean request(T data, BiConsumer<ResponseMessage, Throwable> listener) {
logger.info("request tBase:{} FutureListener:{}", data, listener);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@

import com.navercorp.pinpoint.rpc.ResponseMessage;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.BiConsumer;

/**
* @author Woonduk Kang(emeroad)
Expand All @@ -30,6 +29,6 @@ public interface RequestMessage<M> {

int getRetryCount();

Consumer<CompletableFuture<ResponseMessage>> getFutureListener();
BiConsumer<ResponseMessage, Throwable> getFutureListener();

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@

import com.navercorp.pinpoint.rpc.ResponseMessage;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.BiConsumer;

/**
* @author Woonduk Kang(emeroad)
Expand All @@ -31,11 +30,11 @@ private RequestMessageFactory() {
}

public static <T> RequestMessage<T> request(T message, int retryCount) {
return new RetryRequestMessage<>(message, retryCount);
return new RetryRequestMessage<>(message, retryCount, null);
}

public static <T> RequestMessage<T> request(T message, Consumer<CompletableFuture<ResponseMessage>> futureListener) {
return new ListenerableRequestMessage<>(message, futureListener);
public static <T> RequestMessage<T> request(T message, int retryCount, BiConsumer<ResponseMessage, Throwable> futureListener) {
return new RetryRequestMessage<>(message, retryCount, futureListener);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@

package com.navercorp.pinpoint.profiler.sender;


import com.navercorp.pinpoint.rpc.ResponseMessage;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.BiConsumer;

/**
* @author Woonduk Kang(emeroad)
*/
public class RetryRequestMessage<T> implements RequestMessage<T> {
class RetryRequestMessage<T> implements RequestMessage<T> {
private final T message;
private final int retryCount;
private final BiConsumer<ResponseMessage, Throwable> futureListener;


RetryRequestMessage(T message, int retryCount) {
RetryRequestMessage(T message, int retryCount, BiConsumer<ResponseMessage, Throwable> futureListener) {
this.message = message;
this.retryCount = retryCount;
this.futureListener = futureListener;
}


Expand All @@ -47,7 +47,7 @@ public int getRetryCount() {
}

@Override
public Consumer<CompletableFuture<ResponseMessage>> getFutureListener() {
return null;
public BiConsumer<ResponseMessage, Throwable> getFutureListener() {
return futureListener;
}
}
Loading

0 comments on commit 92a1bee

Please sign in to comment.