Skip to content

Commit

Permalink
change from static time methods to clock (#1477)
Browse files Browse the repository at this point in the history
  • Loading branch information
pongad authored Dec 15, 2016
1 parent 4dd7d79 commit daaf061
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -152,6 +153,7 @@ final class Builder {

Optional<ScheduledExecutorService> executor;
Optional<ManagedChannelBuilder<? extends ManagedChannelBuilder<?>>> channelBuilder;
Optional<Clock> clock;

/**
* Constructs a new {@link Builder}.
Expand Down Expand Up @@ -180,6 +182,7 @@ private void setDefaults() {
maxOutstandingBytes = Optional.absent();
maxOutstandingMessages = Optional.absent();
executor = Optional.absent();
clock = Optional.absent();
}

/**
Expand Down Expand Up @@ -238,7 +241,7 @@ public Builder setMaxOutstandingBytes(int maxOutstandingBytes) {
/**
* Set acknowledgement expiration padding.
*
* <p>This is the time accounted before a message expiration is to happen, so the
* <p>This is the time accounted before a message expiration is to happen, so the
* {@link Subscriber} is able to send an ack extension beforehand.
*
* <p>This padding duration is configurable so you can account for network latency. A reasonable
Expand All @@ -259,6 +262,12 @@ public Builder setExecutor(ScheduledExecutorService executor) {
return this;
}

/** Gives the ability to set a custom executor. */
public Builder setClock(Clock clock) {
this.clock = Optional.of(clock);
return this;
}

public Subscriber build() throws IOException {
return new SubscriberImpl(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.pubsub;

import com.google.auth.Credentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.Subscriber.MessageReceiver;
import com.google.cloud.pubsub.Subscriber.MessageReceiver.AckReply;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -70,6 +71,7 @@ final class SubscriberConnection extends AbstractService {

private final Duration ackExpirationPadding;
private final ScheduledExecutorService executor;
private final Clock clock;
private final MessageReceiver receiver;
private final String subscription;
private Duration channelReconnectBackoff = INITIAL_CHANNEL_RECONNECT_BACKOFF;
Expand All @@ -96,7 +98,7 @@ final class SubscriberConnection extends AbstractService {
// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;

public SubscriberConnection(
SubscriberConnection(
String subscription,
Credentials credentials,
MessageReceiver receiver,
Expand All @@ -105,8 +107,10 @@ public SubscriberConnection(
Distribution ackLatencyDistribution,
Channel channel,
FlowController flowController,
ScheduledExecutorService executor) {
ScheduledExecutorService executor,
Clock clock) {
this.executor = executor;
this.clock = clock;
this.credentials = credentials;
this.ackExpirationPadding = ackExpirationPadding;
this.streamAckDeadlineSeconds = streamAckDeadlineSeconds;
Expand All @@ -125,16 +129,18 @@ public SubscriberConnection(
}

private static class ExpirationInfo implements Comparable<ExpirationInfo> {
private final Clock clock;
Instant expiration;
int nextExtensionSeconds;

ExpirationInfo(Instant expiration, int initialAckDeadlineExtension) {
ExpirationInfo(Clock clock, Instant expiration, int initialAckDeadlineExtension) {
this.clock = clock;
this.expiration = expiration;
nextExtensionSeconds = initialAckDeadlineExtension;
}

void extendExpiration() {
expiration = Instant.now().plus(Duration.standardSeconds(nextExtensionSeconds));
expiration = new Instant(clock.millis()).plus(Duration.standardSeconds(nextExtensionSeconds));
nextExtensionSeconds = 2 * nextExtensionSeconds;
}

Expand Down Expand Up @@ -181,7 +187,7 @@ private class AckHandler implements FutureCallback<AckReply>, Comparable<AckHand
this.ackId = ackId;
this.outstandingBytes = outstandingBytes;
acked = new AtomicBoolean(false);
receivedTime = Instant.now();
receivedTime = new Instant(clock.millis());
}

@Override
Expand Down Expand Up @@ -216,7 +222,7 @@ public void onSuccess(AckReply reply) {
// Record the latency rounded to the next closest integer.
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil(new Duration(receivedTime, Instant.now()).getMillis() / 1000D)));
(long) Math.ceil((clock.millis() - receivedTime.getMillis()) / 1000D)));
messagesWaiter.incrementPendingMessages(-1);
return;
case NACK:
Expand Down Expand Up @@ -361,7 +367,7 @@ private void processReceivedMessages(StreamingPullResponse response) {
final List<com.google.pubsub.v1.ReceivedMessage> responseMessages =
response.getReceivedMessagesList();
try {
Instant now = Instant.now();
Instant now = new Instant(clock.millis());
int receivedMessagesCount = response.getReceivedMessagesCount();
int totalByteCount = 0;
final List<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
Expand All @@ -372,7 +378,9 @@ private void processReceivedMessages(StreamingPullResponse response) {
}
ExpirationInfo expiration =
new ExpirationInfo(
now.plus(streamAckDeadlineSeconds * 1000), INITIAL_ACK_DEADLINE_EXTENSION_SECONDS);
clock,
now.plus(streamAckDeadlineSeconds * 1000),
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS);
synchronized (outstandingAckHandlers) {
addOutstadingAckHandlers(expiration, ackHandlers);
}
Expand Down Expand Up @@ -451,7 +459,7 @@ public void run() {
alarmsLock.unlock();
}

Instant now = Instant.now();
Instant now = new Instant(clock.millis());
// Rounded to the next second, so we only schedule future alarms at the second
// resolution.
Instant cutOverTime =
Expand Down Expand Up @@ -531,7 +539,7 @@ private void setupNextAckDeadlineExtensionAlarm(ExpirationInfo messageExpiration
ackDeadlineExtensionAlarm =
executor.schedule(
new AckDeadlineAlarm(),
nextAckDeadlineExtensionAlarmTime.getMillis() - Instant.now().getMillis(),
nextAckDeadlineExtensionAlarmTime.getMillis() - clock.millis(),
TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.Clock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.primitives.Ints;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class SubscriberImpl extends AbstractService implements Subscriber {
private final ScheduledExecutorService executor;
private final Distribution ackLatencyDistribution =
new Distribution(MAX_ACK_DEADLINE_SECONDS + 1);
private final Clock clock;
private ScheduledFuture<?> ackDeadlineUpdater;
private int streamAckDeadlineSeconds;

Expand All @@ -72,6 +74,7 @@ public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException {
Math.max(
INITIAL_ACK_DEADLINE_SECONDS,
Ints.saturatedCast(ackExpirationPadding.getStandardSeconds()));
clock = builder.clock.isPresent() ? builder.clock.get() : Clock.defaultClock();

int numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
executor =
Expand Down Expand Up @@ -113,7 +116,8 @@ public SubscriberImpl(SubscriberImpl.Builder builder) throws IOException {
ackLatencyDistribution,
channelBuilder.build(),
flowController,
executor);
executor,
clock);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static org.junit.Assert.assertTrue;

import com.google.cloud.Clock;
import com.google.cloud.GrpcServiceOptions.ExecutorFactory;
import com.google.common.collect.ImmutableList;
import java.util.concurrent.CountDownLatch;
Expand All @@ -30,7 +29,6 @@
import java.util.concurrent.atomic.AtomicLong;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.joda.time.DateTimeUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -51,12 +49,6 @@ public class AckDeadlineRenewerTest {
private PubSub pubsub;
private FakeScheduledExecutorService executorService;
private AckDeadlineRenewer ackDeadlineRenewer;
private final Clock clock = new Clock() {
@Override
public long millis() {
return DateTimeUtils.currentTimeMillis();
}
};

@Rule
public Timeout globalTimeout = Timeout.seconds(60);
Expand All @@ -78,7 +70,7 @@ public void release(ExecutorService executor) {
PubSubOptions options = PubSubOptions.newBuilder()
.setProjectId("projectId")
.setExecutorFactory(executorFactory)
.setClock(clock)
.setClock(executorService.getClock())
.build();
EasyMock.expect(pubsub.getOptions()).andReturn(options);
EasyMock.replay(pubsub);
Expand All @@ -97,7 +89,7 @@ private IAnswer<Future<Void>> createAnswer(final CountDownLatch latch,
@Override
public Future<Void> answer() throws Throwable {
latch.countDown();
renewal.set(clock.millis());
renewal.set(executorService.getClock().millis());
return null;
}
};
Expand All @@ -117,7 +109,7 @@ public void testAddOneMessage() throws InterruptedException {
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
.andAnswer(createAnswer(secondLatch, secondRenewal));
EasyMock.replay(pubsub);
long addTime = clock.millis();
long addTime = executorService.getClock().millis();
ackDeadlineRenewer.add(SUBSCRIPTION1, ACK_ID1);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
firstLatch.await();
Expand Down Expand Up @@ -149,7 +141,7 @@ public void testAddMessages() throws InterruptedException {
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1, ACK_ID3)))
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
EasyMock.replay(pubsub);
long addTime1 = clock.millis();
long addTime1 = executorService.getClock().millis();
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -185,7 +177,7 @@ public void testAddExistingMessage() throws InterruptedException {
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
EasyMock.replay(pubsub);
long addTime1 = clock.millis();
long addTime1 = executorService.getClock().millis();
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -221,7 +213,7 @@ public void testRemoveNonExistingMessage() throws InterruptedException {
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
EasyMock.replay(pubsub);
long addTime1 = clock.millis();
long addTime1 = executorService.getClock().millis();
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -257,7 +249,7 @@ public void testRemoveMessage() throws InterruptedException {
TimeUnit.MILLISECONDS, ImmutableList.of(ACK_ID1)))
.andAnswer(createAnswer(secondLatch, secondRenewalSub2));
EasyMock.replay(pubsub);
long addTime1 = clock.millis();
long addTime1 = executorService.getClock().millis();
ackDeadlineRenewer.add(SUBSCRIPTION1, ImmutableList.of(ACK_ID1, ACK_ID2));
ackDeadlineRenewer.add(SUBSCRIPTION2, ACK_ID1);
executorService.tick(TIME_ADVANCE, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsub;

import com.google.cloud.Clock;
import com.google.cloud.pubsub.FakeClock;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
Expand All @@ -33,7 +35,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.MutableDateTime;

Expand All @@ -46,11 +47,11 @@ public class FakeScheduledExecutorService extends AbstractExecutorService

private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final PriorityQueue<PendingCallable<?>> pendingCallables = new PriorityQueue<>();
private final MutableDateTime currentTime = MutableDateTime.now();
private final ExecutorService delegate = Executors.newSingleThreadExecutor();
private final FakeClock clock = new FakeClock();

public FakeScheduledExecutorService() {
DateTimeUtils.setCurrentMillisFixed(currentTime.getMillis());
public Clock getClock() {
return clock;
}

@Override
Expand Down Expand Up @@ -92,11 +93,12 @@ public void tick(long time, TimeUnit unit) {
* outstanding callable which execution time has passed.
*/
public void advanceTime(Duration toAdvance) {
currentTime.add(toAdvance);
DateTimeUtils.setCurrentMillisFixed(currentTime.getMillis());
clock.advance(toAdvance.getMillis(), TimeUnit.MILLISECONDS);
DateTime cmpTime = new DateTime(clock.millis());

synchronized (pendingCallables) {
while (!pendingCallables.isEmpty()
&& pendingCallables.peek().getScheduledTime().compareTo(currentTime) <= 0) {
&& pendingCallables.peek().getScheduledTime().compareTo(cmpTime) <= 0) {
try {
pendingCallables.poll().call();
if (shutdown.get() && pendingCallables.isEmpty()) {
Expand Down Expand Up @@ -186,7 +188,7 @@ static enum PendingCallableType {

/** Class that saves the state of an scheduled pending callable. */
class PendingCallable<T> implements Comparable<PendingCallable<T>> {
DateTime creationTime = currentTime.toDateTime();
DateTime creationTime = new DateTime(clock.millis());
Duration delay;
Callable<T> pendingCallable;
SettableFuture<T> future = SettableFuture.create();
Expand Down Expand Up @@ -221,8 +223,7 @@ ScheduledFuture<T> getScheduledFuture() {
return new ScheduledFuture<T>() {
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(
new Duration(currentTime, getScheduledTime()).getMillis(), TimeUnit.MILLISECONDS);
return unit.convert(getScheduledTime().getMillis() - clock.millis(), TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -279,7 +280,7 @@ T call() {
done.set(true);
break;
case FIXED_DELAY:
this.creationTime = currentTime.toDateTime();
this.creationTime = new DateTime(clock.millis());
schedulePendingCallable(this);
break;
case FIXED_RATE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,21 @@ public static enum CloseSide {
public static final class ModifyAckDeadline {
private final String ackId;
private final long seconds;

public ModifyAckDeadline(String ackId, long seconds) {
Preconditions.checkNotNull(ackId);
this.ackId = ackId;
this.seconds = seconds;
}

public String getAckId() {
return ackId;
}

public long getSeconds() {
return seconds;
}

@Override
public boolean equals(Object obj) {
if (!(obj instanceof ModifyAckDeadline)) {
Expand All @@ -78,7 +78,7 @@ public boolean equals(Object obj) {
ModifyAckDeadline other = (ModifyAckDeadline) obj;
return other.ackId.equals(this.ackId) && other.seconds == this.seconds;
}

@Override
public int hashCode() {
return ackId.hashCode();
Expand Down
Loading

0 comments on commit daaf061

Please sign in to comment.