Skip to content
This repository was archived by the owner on Nov 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ private enum State {
private final Lock lock = new ReentrantLock(false);
private final Condition condition = lock.newCondition();

@GuardedBy(value = "lock")
@GuardedBy("lock")
private V returnValue;

@GuardedBy(value = "lock")
@GuardedBy("lock")
private Throwable executionException;

@GuardedBy(value = "lock")
@GuardedBy("lock")
private volatile CancellationException cancellationException;

@GuardedBy(value = "lock")
@GuardedBy("lock")
private volatile State state = State.WAITING_TO_RUN;

private final FutureTask<?> futureTask = new FutureTask<Void>(
Expand Down Expand Up @@ -113,10 +113,14 @@ public final boolean cancel(boolean mayInterruptIfRunning) {
return futureTask.cancel(mayInterruptIfRunning);
}

@SuppressWarnings("GuardedByChecker")
protected void noteFinished() {
state = State.COMPLETED;
condition.signalAll();
lock.lock();
try {
state = State.COMPLETED;
condition.signalAll();
} finally {
lock.unlock();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively I think we can annotate this method with @GuardedBy("lock") to ensure the lock is already held.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call and simplifies things


/**
Expand Down Expand Up @@ -157,31 +161,43 @@ public final V get(long timeout, TimeUnit unit)
}
}

@SuppressWarnings("GuardedByChecker")
private V getReturnValue() throws ExecutionException, CancellationException {
if (cancellationException != null) {
throw Throwables.chain(
new CancellationException("This task was canceled before it ever ran."), cancellationException);
}
if (executionException != null) {
throw new ExecutionException(executionException);
lock.lock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect we can do the same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated both

try {
if (cancellationException != null) {
throw Throwables.chain(
new CancellationException("This task was canceled before it ever ran."), cancellationException);
}
if (executionException != null) {
throw new ExecutionException(executionException);
}
return returnValue;
} finally {
lock.unlock();
}
return returnValue;
}

/**
* @return true if and only if the task was canceled before it ever executed
*/
@Override
@SuppressWarnings("GuardedByChecker")
public final boolean isCancelled() {
return cancellationException != null;
lock.lock();
try {
return cancellationException != null;
} finally {
lock.unlock();
}
}

@Override
@SuppressWarnings("GuardedByChecker")
public final boolean isDone() {
return state == State.COMPLETED;
lock.lock();
try {
return state == State.COMPLETED;
} finally {
lock.unlock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ protected String getInitializingClassName() {
private final PhysicalBoundStoreStrategy physicalBoundStoreStrategy;
private final InitializingWrapper wrapper = new InitializingWrapper();

@GuardedBy("this") // lazy init to avoid db connections in constructors
private DBType dbType;
private volatile DBType dbType;

@GuardedBy("this")
private Long currentLimit = null;
Expand Down Expand Up @@ -131,7 +130,6 @@ private InDbTimestampBoundStore(
this.physicalBoundStoreStrategy = physicalBoundStoreStrategy;
}

@SuppressWarnings("GuardedBy") // TODO (jkong): synchronize?
private void init() {
try (Connection conn = connManager.getConnection()) {
physicalBoundStoreStrategy.createTimestampTable(conn, this::getDbType);
Expand Down Expand Up @@ -222,11 +220,17 @@ public synchronized void storeUpperLimit(final long limit) {
});
}

@GuardedBy("this")
private DBType getDbType(Connection connection) {
if (dbType == null) {
dbType = ConnectionDbTypes.getDbType(connection);
DBType type = this.dbType; // volatile read
if (type == null) {
synchronized (this) {
type = this.dbType;
if (type == null) {
type = ConnectionDbTypes.getDbType(connection);
this.dbType = type;
}
}
}
return dbType;
return type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,9 @@ private synchronized boolean isNewEvent(LockWatchEvent event) {
.orElse(true);
}

@SuppressWarnings("GuardedBy") // The stream operation is terminal within this synchronized method.
private synchronized void assertNoSnapshotsMissing(Multimap<Sequence, StartTimestamp> reversedMap) {
Set<Sequence> sequences = reversedMap.keySet();
if (sequences.stream().map(snapshotStore::getSnapshotForSequence).anyMatch(Optional::isEmpty)) {
if (sequences.stream().map(this::getSnapshotForSequence).anyMatch(Optional::isEmpty)) {
log.warn(
"snapshots were not taken for all sequences; logging additional information",
SafeArg.of("numSequences", sequences),
Expand All @@ -261,6 +260,12 @@ private synchronized void assertNoSnapshotsMissing(Multimap<Sequence, StartTimes
}
}

private Optional<ValueCacheSnapshot> getSnapshotForSequence(Sequence sequence) {
synchronized (this) {
return snapshotStore.getSnapshotForSequence(sequence);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could be refactored, but as written may make the synchronization more obvious. Either way works for me

Suggested change
private Optional<ValueCacheSnapshot> getSnapshotForSequence(Sequence sequence) {
synchronized (this) {
return snapshotStore.getSnapshotForSequence(sequence);
}
}
private synchronized Optional<ValueCacheSnapshot> getSnapshotForSequence(Sequence sequence) {
return snapshotStore.getSnapshotForSequence(sequence);
}


private synchronized void updateCurrentVersion(Optional<LockWatchVersion> maybeUpdateVersion) {
maybeUpdateVersion
.filter(this::shouldUpdateVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1240,7 +1240,7 @@ private static String escapeText(String input) {
break;
default:
// Check for other control characters
if (c >= 0x0000 && c <= 0x001F) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if (c <= 0x001F) {
appendEscapedUnicode(builder, c);
} else if (Character.isHighSurrogate(c)) {
// Encode the surrogate pair using 2 six-character sequence (\\uXXXX\\uXXXX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,13 +657,15 @@ public void startFirstAndFail(Transaction tx, long streamId) {
@Override
public void startSecondAndFinish(Transaction tx, long streamId) {
StreamTestStreamStore ss = StreamTestStreamStore.of(txManager, StreamTestTableFactory.of());
ss.storeStreams(tx, ImmutableMap.of(streamId, new ByteArrayInputStream(new byte[1])));
ss.storeStreams(tx, ImmutableMap.of(streamId, new ByteArrayInputStream(new byte[] {0x42})));
}
});

Optional<InputStream> stream = getStream(streamId);
assertThat(stream).isPresent();
assertThat(stream.get()).isNotNull();
try (InputStream inputStream = stream.get()) {
assertThat(inputStream).isNotNull().hasBinaryContent(new byte[] {0x42});
}
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ buildscript {
dependencies {
classpath 'com.netflix.nebula:gradle-info-plugin:12.1.6'
classpath 'com.netflix.nebula:nebula-publishing-plugin:20.3.0'
classpath 'com.palantir.baseline:gradle-baseline-java:5.0.0'
classpath 'com.palantir.baseline:gradle-baseline-java:5.18.0'
classpath 'com.palantir.gradle.conjure:gradle-conjure:5.41.0'
classpath 'com.palantir.gradle.consistentversions:gradle-consistent-versions:2.13.0'
classpath 'com.palantir.gradle.docker:gradle-docker:0.32.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,15 @@ public static boolean isQueryRegistered(String key) {
* @param dbType Look for queries registered with this override first
* @return a SQLString object representing the stored query
*/
@SuppressWarnings("GuardedByChecker")
static FinalSQLString getByKey(final String key, DBType dbType) {
assert isValidKey(key) : "Keys only consist of word characters"; // $NON-NLS-1$
assert registeredValues.containsKey(key) || registeredValuesOverride.containsKey(key)
: "Couldn't find SQLString key: " + key + ", dbtype " + dbType; // $NON-NLS-1$ //$NON-NLS-2$

FinalSQLString cached = cachedKeyed.get(key);
FinalSQLString cached;
synchronized (cacheLock) {
cached = cachedKeyed.get(key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear that this get invocation must occur holding cacheLock, but it's also not clear that the lock buys us anything on volatile reads/writes either, since only access and modify operations are locked without any state checks (the runWithCacheLock method appears unused internally).

Perhaps we can remove runWithCacheLock and drop the lock, relying on the volatile reference for ordering?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole cachedKeyed/cacheLock here is a bit of a mess.
In a follow on PR after we unblock gradle-baseline, I do think we can probably remove runWithCacheLock (and possibly the whole cachedKeyed, cacheLock, setCachedKeyed, getCachedKeyed) as I don't see any consumers of these APIs.

Note that just removing the synchronized (cacheLock) from the volatile read still triggers the GuardedBy error (as expected), so I refactored into a separate method to explicitly do volatile read & suppress (also handle possible NPE).

error: [GuardedBy] This access should be guarded by 'SQLString.cacheLock', which is not currently held
        FinalSQLString cached = cachedKeyed.get(key);

}
if (null != cached) {
callbackOnUse.noteUse((SQLString) cached.delegate);
return cached;
Expand Down Expand Up @@ -209,7 +211,6 @@ public static boolean isValidKey(final String key) {
* @param sql The string to be used in a query
* @return a SQLString object representing the given SQL
*/
@SuppressWarnings("GuardedByChecker")
static FinalSQLString getUnregisteredQuery(String sql) {
assert !isValidKey(sql) : "Unregistered Queries should not look like keys"; // $NON-NLS-1$
return new FinalSQLString(new SQLString(sql));
Expand Down Expand Up @@ -423,9 +424,10 @@ protected static ImmutableMap<String, FinalSQLString> getCachedUnregistered() {
@Deprecated
protected static void setCachedUnregistered(ImmutableMap<String, FinalSQLString> _cachedUnregistered) {}

@SuppressWarnings("GuardedByChecker")
protected static ImmutableMap<String, FinalSQLString> getCachedKeyed() {
return cachedKeyed;
synchronized (cacheLock) {
return cachedKeyed;
}
}

protected static void setCachedKeyed(ImmutableMap<String, FinalSQLString> cachedKeyed) {
Expand Down