Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static com.google.common.base.Strings.isNullOrEmpty;
Expand All @@ -51,7 +50,6 @@
import static java.util.Objects.requireNonNull;

class ContinuousTaskStatusFetcher
implements SimpleHttpResponseCallback<TaskStatus>
{
private static final Logger log = Logger.get(ContinuousTaskStatusFetcher.class);

Expand All @@ -67,8 +65,6 @@ class ContinuousTaskStatusFetcher
private final RequestErrorTracker errorTracker;
private final RemoteTaskStats stats;

private final AtomicLong currentRequestStartNanos = new AtomicLong();

@GuardedBy("this")
private boolean running;

Expand Down Expand Up @@ -154,61 +150,66 @@ private synchronized void scheduleNextRequest()

errorTracker.startRequest();
future = httpClient.executeAsync(request, createFullJsonResponseHandler(taskStatusCodec));
currentRequestStartNanos.set(System.nanoTime());
Futures.addCallback(future, new SimpleHttpResponseHandler<>(this, request.getUri(), stats), executor);
Futures.addCallback(future, new SimpleHttpResponseHandler<>(new TaskStatusResponseCallback(), request.getUri(), stats), executor);
}

TaskStatus getTaskStatus()
{
return taskStatus.get();
}

@Override
public void success(TaskStatus value)
private class TaskStatusResponseCallback
implements SimpleHttpResponseCallback<TaskStatus>
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
try {
updateTaskStatus(value);
errorTracker.requestSucceeded();
}
finally {
scheduleNextRequest();
private final long requestStartNanos = System.nanoTime();

@Override
public void success(TaskStatus value)
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(requestStartNanos);
try {
updateTaskStatus(value);
errorTracker.requestSucceeded();
}
finally {
scheduleNextRequest();
}
}
}
}

@Override
public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
try {
// if task not already done, record error
TaskStatus taskStatus = getTaskStatus();
if (!taskStatus.getState().isDone()) {
errorTracker.requestFailed(cause);
@Override
public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(requestStartNanos);
try {
// if task not already done, record error
TaskStatus taskStatus = getTaskStatus();
if (!taskStatus.getState().isDone()) {
errorTracker.requestFailed(cause);
}
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
finally {
scheduleNextRequest();
}
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
finally {
scheduleNextRequest();
}
}
}

@Override
public void fatal(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
onFail.accept(cause);
@Override
public void fatal(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(requestStartNanos);
onFail.accept(cause);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.net.URI;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
Expand All @@ -45,7 +44,6 @@
import static java.util.Objects.requireNonNull;

class DynamicFiltersFetcher
implements SimpleHttpResponseCallback<VersionedDynamicFilterDomains>
{
private final TaskId taskId;
private final URI taskUri;
Expand All @@ -57,7 +55,6 @@ class DynamicFiltersFetcher
private final RequestErrorTracker errorTracker;
private final RemoteTaskStats stats;
private final DynamicFilterService dynamicFilterService;
private final AtomicLong currentRequestStartNanos = new AtomicLong();

@GuardedBy("this")
private long dynamicFiltersVersion = INITIAL_DYNAMIC_FILTERS_VERSION;
Expand Down Expand Up @@ -158,52 +155,57 @@ private synchronized void fetchDynamicFiltersIfNecessary()

errorTracker.startRequest();
future = httpClient.executeAsync(request, createFullJsonResponseHandler(dynamicFilterDomainsCodec));
currentRequestStartNanos.set(System.nanoTime());
addCallback(future, new SimpleHttpResponseHandler<>(this, request.getUri(), stats), executor);
addCallback(future, new SimpleHttpResponseHandler<>(new DynamicFiltersResponseCallback(), request.getUri(), stats), executor);
}

@Override
public void success(VersionedDynamicFilterDomains newDynamicFilterDomains)
private class DynamicFiltersResponseCallback
implements SimpleHttpResponseCallback<VersionedDynamicFilterDomains>
{
try (SetThreadName ignored = new SetThreadName("DynamicFiltersFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
try {
updateDynamicFilterDomains(newDynamicFilterDomains);
errorTracker.requestSucceeded();
}
finally {
fetchDynamicFiltersIfNecessary();
private final long requestStartNanos = System.nanoTime();

@Override
public void success(VersionedDynamicFilterDomains newDynamicFilterDomains)
{
try (SetThreadName ignored = new SetThreadName("DynamicFiltersFetcher-%s", taskId)) {
updateStats(requestStartNanos);
try {
updateDynamicFilterDomains(newDynamicFilterDomains);
errorTracker.requestSucceeded();
}
finally {
fetchDynamicFiltersIfNecessary();
}
}
}
}

@Override
public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("DynamicFiltersFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
try {
errorTracker.requestFailed(cause);
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
finally {
fetchDynamicFiltersIfNecessary();
@Override
public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("DynamicFiltersFetcher-%s", taskId)) {
updateStats(requestStartNanos);
try {
errorTracker.requestFailed(cause);
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
finally {
fetchDynamicFiltersIfNecessary();
}
}
}
}

@Override
public void fatal(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("DynamicFiltersFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
onFail.accept(cause);
@Override
public void fatal(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("DynamicFiltersFetcher-%s", taskId)) {
updateStats(requestStartNanos);
onFail.accept(cause);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class TaskInfoFetcher
implements SimpleHttpResponseCallback<TaskInfo>
{
private final TaskId taskId;
private final Consumer<Throwable> onFail;
Expand All @@ -67,10 +66,6 @@ public class TaskInfoFetcher
private final RequestErrorTracker errorTracker;

private final boolean summarizeTaskInfo;

@GuardedBy("this")
private final AtomicLong currentRequestStartNanos = new AtomicLong();

private final RemoteTaskStats stats;

@GuardedBy("this")
Expand Down Expand Up @@ -212,8 +207,7 @@ private synchronized void sendNextRequest()

errorTracker.startRequest();
future = httpClient.executeAsync(request, createFullJsonResponseHandler(taskInfoCodec));
currentRequestStartNanos.set(System.nanoTime());
Futures.addCallback(future, new SimpleHttpResponseHandler<>(this, request.getUri(), stats), executor);
Futures.addCallback(future, new SimpleHttpResponseHandler<>(new TaskInfoResponseCallback(), request.getUri(), stats), executor);
}

synchronized void updateTaskInfo(TaskInfo newTaskInfo)
Expand Down Expand Up @@ -247,49 +241,51 @@ synchronized void updateTaskInfo(TaskInfo newTaskInfo)
}
}

@Override
public void success(TaskInfo newValue)
private class TaskInfoResponseCallback
implements SimpleHttpResponseCallback<TaskInfo>
{
try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", taskId)) {
lastUpdateNanos.set(System.nanoTime());
private final long requestStartNanos = System.nanoTime();

long startNanos;
synchronized (this) {
startNanos = this.currentRequestStartNanos.get();
@Override
public void success(TaskInfo newValue)
{
try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", taskId)) {
lastUpdateNanos.set(System.nanoTime());

updateStats(requestStartNanos);
errorTracker.requestSucceeded();
updateTaskInfo(newValue);
}
updateStats(startNanos);
errorTracker.requestSucceeded();
updateTaskInfo(newValue);
}
}

@Override
public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", taskId)) {
lastUpdateNanos.set(System.nanoTime());

try {
// if task not already done, record error
if (!isDone(getTaskInfo())) {
errorTracker.requestFailed(cause);
@Override
public void failed(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", taskId)) {
lastUpdateNanos.set(System.nanoTime());

try {
// if task not already done, record error
if (!isDone(getTaskInfo())) {
errorTracker.requestFailed(cause);
}
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
}
}

@Override
public void fatal(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", taskId)) {
onFail.accept(cause);
@Override
public void fatal(Throwable cause)
{
try (SetThreadName ignored = new SetThreadName("TaskInfoFetcher-%s", taskId)) {
onFail.accept(cause);
}
}
}

Expand Down