Skip to content
This repository was archived by the owner on Nov 14, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ private enum State {
private final Lock lock = new ReentrantLock(false);
private final Condition condition = lock.newCondition();

@GuardedBy(value = "lock")
@GuardedBy("lock")
private V returnValue;

@GuardedBy(value = "lock")
@GuardedBy("lock")
private Throwable executionException;

@GuardedBy(value = "lock")
@GuardedBy("lock")
private volatile CancellationException cancellationException;

@GuardedBy(value = "lock")
@GuardedBy("lock")
private volatile State state = State.WAITING_TO_RUN;

private final FutureTask<?> futureTask = new FutureTask<Void>(
Expand Down Expand Up @@ -113,7 +113,7 @@ public final boolean cancel(boolean mayInterruptIfRunning) {
return futureTask.cancel(mayInterruptIfRunning);
}

@SuppressWarnings("GuardedByChecker")
@GuardedBy("lock")
protected void noteFinished() {
state = State.COMPLETED;
condition.signalAll();
Expand Down Expand Up @@ -157,7 +157,7 @@ public final V get(long timeout, TimeUnit unit)
}
}

@SuppressWarnings("GuardedByChecker")
@GuardedBy("lock")
private V getReturnValue() throws ExecutionException, CancellationException {
if (cancellationException != null) {
throw Throwables.chain(
Expand All @@ -173,15 +173,23 @@ private V getReturnValue() throws ExecutionException, CancellationException {
* @return true if and only if the task was canceled before it ever executed
*/
@Override
@SuppressWarnings("GuardedByChecker")
public final boolean isCancelled() {
return cancellationException != null;
lock.lock();
try {
return cancellationException != null;
} finally {
lock.unlock();
}
}

@Override
@SuppressWarnings("GuardedByChecker")
public final boolean isDone() {
return state == State.COMPLETED;
lock.lock();
try {
return state == State.COMPLETED;
} finally {
lock.unlock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ protected String getInitializingClassName() {
private final PhysicalBoundStoreStrategy physicalBoundStoreStrategy;
private final InitializingWrapper wrapper = new InitializingWrapper();

@GuardedBy("this") // lazy init to avoid db connections in constructors
private DBType dbType;
private volatile DBType dbType;

@GuardedBy("this")
private Long currentLimit = null;
Expand Down Expand Up @@ -131,7 +130,6 @@ private InDbTimestampBoundStore(
this.physicalBoundStoreStrategy = physicalBoundStoreStrategy;
}

@SuppressWarnings("GuardedBy") // TODO (jkong): synchronize?
private void init() {
try (Connection conn = connManager.getConnection()) {
physicalBoundStoreStrategy.createTimestampTable(conn, this::getDbType);
Expand Down Expand Up @@ -222,11 +220,17 @@ public synchronized void storeUpperLimit(final long limit) {
});
}

@GuardedBy("this")
private DBType getDbType(Connection connection) {
if (dbType == null) {
dbType = ConnectionDbTypes.getDbType(connection);
DBType type = this.dbType; // volatile read
if (type == null) {
synchronized (this) {
type = this.dbType;
if (type == null) {
type = ConnectionDbTypes.getDbType(connection);
this.dbType = type;
}
}
}
return dbType;
return type;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,9 @@ private synchronized boolean isNewEvent(LockWatchEvent event) {
.orElse(true);
}

@SuppressWarnings("GuardedBy") // The stream operation is terminal within this synchronized method.
private synchronized void assertNoSnapshotsMissing(Multimap<Sequence, StartTimestamp> reversedMap) {
Set<Sequence> sequences = reversedMap.keySet();
if (sequences.stream().map(snapshotStore::getSnapshotForSequence).anyMatch(Optional::isEmpty)) {
if (sequences.stream().map(this::getSnapshotForSequence).anyMatch(Optional::isEmpty)) {
log.warn(
"snapshots were not taken for all sequences; logging additional information",
SafeArg.of("numSequences", sequences),
Expand All @@ -261,6 +260,10 @@ private synchronized void assertNoSnapshotsMissing(Multimap<Sequence, StartTimes
}
}

private synchronized Optional<ValueCacheSnapshot> getSnapshotForSequence(Sequence sequence) {
return snapshotStore.getSnapshotForSequence(sequence);
}

private synchronized void updateCurrentVersion(Optional<LockWatchVersion> maybeUpdateVersion) {
maybeUpdateVersion
.filter(this::shouldUpdateVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1240,7 +1240,7 @@ private static String escapeText(String input) {
break;
default:
// Check for other control characters
if (c >= 0x0000 && c <= 0x001F) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if (c <= 0x001F) {
appendEscapedUnicode(builder, c);
} else if (Character.isHighSurrogate(c)) {
// Encode the surrogate pair using 2 six-character sequence (\\uXXXX\\uXXXX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,13 +657,15 @@ public void startFirstAndFail(Transaction tx, long streamId) {
@Override
public void startSecondAndFinish(Transaction tx, long streamId) {
StreamTestStreamStore ss = StreamTestStreamStore.of(txManager, StreamTestTableFactory.of());
ss.storeStreams(tx, ImmutableMap.of(streamId, new ByteArrayInputStream(new byte[1])));
ss.storeStreams(tx, ImmutableMap.of(streamId, new ByteArrayInputStream(new byte[] {0x42})));
}
});

Optional<InputStream> stream = getStream(streamId);
assertThat(stream).isPresent();
assertThat(stream.get()).isNotNull();
try (InputStream inputStream = stream.get()) {
assertThat(inputStream).isNotNull().hasBinaryContent(new byte[] {0x42});
}
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ buildscript {
dependencies {
classpath 'com.netflix.nebula:gradle-info-plugin:12.1.6'
classpath 'com.netflix.nebula:nebula-publishing-plugin:20.3.0'
classpath 'com.palantir.baseline:gradle-baseline-java:5.0.0'
classpath 'com.palantir.baseline:gradle-baseline-java:5.18.0'
classpath 'com.palantir.gradle.conjure:gradle-conjure:5.41.0'
classpath 'com.palantir.gradle.consistentversions:gradle-consistent-versions:2.13.0'
classpath 'com.palantir.gradle.docker:gradle-docker:0.32.0'
Expand Down
18 changes: 13 additions & 5 deletions commons-db/src/main/java/com/palantir/nexus/db/sql/SQLString.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.commons.lang3.Validate;

Expand Down Expand Up @@ -86,6 +87,7 @@ protected interface CallableCheckedException<T, E extends Exception> {
* Runs the provided callable while holding the lock for the override caches.
* Callers replacing the caches should hold this lock.
*/
@Deprecated
protected static <T, E extends Exception> T runWithCacheLock(CallableCheckedException<T, E> callable) throws E {
synchronized (cacheLock) {
return callable.call();
Expand Down Expand Up @@ -165,13 +167,12 @@ public static boolean isQueryRegistered(String key) {
* @param dbType Look for queries registered with this override first
* @return a SQLString object representing the stored query
*/
@SuppressWarnings("GuardedByChecker")
static FinalSQLString getByKey(final String key, DBType dbType) {
assert isValidKey(key) : "Keys only consist of word characters"; // $NON-NLS-1$
assert registeredValues.containsKey(key) || registeredValuesOverride.containsKey(key)
: "Couldn't find SQLString key: " + key + ", dbtype " + dbType; // $NON-NLS-1$ //$NON-NLS-2$

FinalSQLString cached = cachedKeyed.get(key);
FinalSQLString cached = getCachedSql(key);
if (null != cached) {
callbackOnUse.noteUse((SQLString) cached.delegate);
return cached;
Expand All @@ -192,6 +193,13 @@ static FinalSQLString getByKey(final String key, DBType dbType) {
return valueForKey;
}

@Nullable
@SuppressWarnings("GuardedBy") // we're only doing a volatile read of current cache
private static FinalSQLString getCachedSql(String key) {
ImmutableMap<String, FinalSQLString> cache = cachedKeyed; // volatile read
return cache == null ? null : cache.get(key);
}

static FinalSQLString getByKey(String key, Connection connection) throws PalantirSqlException {
DBType type = DBType.getTypeFromConnection(connection);
return getByKey(key, type);
Expand All @@ -209,7 +217,6 @@ public static boolean isValidKey(final String key) {
* @param sql The string to be used in a query
* @return a SQLString object representing the given SQL
*/
@SuppressWarnings("GuardedByChecker")
static FinalSQLString getUnregisteredQuery(String sql) {
assert !isValidKey(sql) : "Unregistered Queries should not look like keys"; // $NON-NLS-1$
return new FinalSQLString(new SQLString(sql));
Expand Down Expand Up @@ -423,9 +430,10 @@ protected static ImmutableMap<String, FinalSQLString> getCachedUnregistered() {
@Deprecated
protected static void setCachedUnregistered(ImmutableMap<String, FinalSQLString> _cachedUnregistered) {}

@SuppressWarnings("GuardedByChecker")
protected static ImmutableMap<String, FinalSQLString> getCachedKeyed() {
return cachedKeyed;
synchronized (cacheLock) {
return cachedKeyed;
}
}

protected static void setCachedKeyed(ImmutableMap<String, FinalSQLString> cachedKeyed) {
Expand Down