Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ private void setupClient(ThreadPool threadPool) {
@After
public void tearDownAndVerifyCommonStuff() throws Exception {
client.close();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a client was created with the same threadPool that is terminated a line below.
a client terminates a threadPool as well in NoOpClient.close() so this is possibly not needed

It might as well stay, but I think it is unecessary

terminate(threadPool);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -60,6 +62,14 @@ public class DeprecationLogger {
*/
private static final CopyOnWriteArraySet<ThreadContext> THREAD_CONTEXT = new CopyOnWriteArraySet<>();

/**
* In order to prevent accessing closed ThreadContext in test environment
* when iterate <code>THREAD_CONTEXT</code> Set, read lock has to be acquired
* when ThreadContext is closed it has to be removed from the Set first, and removing should acquire write lock
* Since adding closed ThreadContext is not supported, this lock do not need to be acquired before adding to the Set.
*/
private static ReadWriteLock threadContextLock = new ReentrantReadWriteLock(true);

/**
* Set the {@link ThreadContext} used to add deprecation headers to network responses.
* <p>
Expand All @@ -81,16 +91,18 @@ public static void setThreadContext(ThreadContext threadContext) {
* Remove the {@link ThreadContext} used to add deprecation headers to network responses.
* <p>
* This is expected to <em>only</em> be invoked by the {@code Node}'s {@code close} method (therefore once outside of tests).
*
* Node: This method should be called before closing a <code>ThreadContext</code>.
* @see ThreadContext#close()
* @param threadContext The thread context owned by the {@code ThreadPool} (and implicitly a {@code Node})
* @throws IllegalStateException if this {@code threadContext} is unknown (and presumably already unset before)
*/
public static void removeThreadContext(ThreadContext threadContext) {
* @return true if the context was removed, false if the context was now known (possibly already removed)
**/
public static boolean removeThreadContext(ThreadContext threadContext) {
assert threadContext != null;

// remove returning false means it did not have it already
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception should not be thrown, closing a ThreadPool now also removes ThreadContext from a DeprecationLogger.
It is possible to close a ThreadPool multiple times as per NodesTests.testAwaitCloseTimeoutsOnNonInterruptibleTask
THis could happen when someone is calling an api multiple times.

if (THREAD_CONTEXT.remove(threadContext) == false) {
throw new IllegalStateException("Removing unknown ThreadContext not allowed!");
threadContextLock.writeLock().lock();
try {
return THREAD_CONTEXT.remove(threadContext);
} finally {
threadContextLock.writeLock().unlock();
}
}

Expand Down Expand Up @@ -227,46 +239,54 @@ void deprecated(final Set<ThreadContext> threadContexts, final String message, f
}

void deprecated(final Set<ThreadContext> threadContexts, final String message, final boolean log, final Object... params) {
final Iterator<ThreadContext> iterator = threadContexts.iterator();
if (iterator.hasNext()) {
final String formattedMessage = LoggerMessageFormat.format(message, params);
final String warningHeaderValue = formatWarning(formattedMessage);
assert WARNING_HEADER_PATTERN.matcher(warningHeaderValue).matches();
assert extractWarningValueFromWarningHeader(warningHeaderValue).equals(escapeAndEncode(formattedMessage));
while (iterator.hasNext()) {
try {
threadContextLock.readLock().lock();
try {
final Iterator<ThreadContext> iterator = threadContexts.iterator();
if (iterator.hasNext()) {
final String formattedMessage = LoggerMessageFormat.format(message, params);
final String warningHeaderValue = formatWarning(formattedMessage);
assert WARNING_HEADER_PATTERN.matcher(warningHeaderValue).matches();
assert extractWarningValueFromWarningHeader(warningHeaderValue).equals(escapeAndEncode(formattedMessage));
while (iterator.hasNext()) {
final ThreadContext next = iterator.next();
next.addResponseHeader("Warning", warningHeaderValue);
} catch (final IllegalStateException e) {
// ignored; it should be removed shortly
}
}
}

if (log) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@SuppressLoggerChecks(reason = "safely delegates to logger")
@Override
public Void run() {
/**
* There should be only one threadContext (in prod env), @see DeprecationLogger#setThreadContext
*/
String opaqueId = getXOpaqueId(threadContexts);

logger.warn(new DeprecatedMessage(message, opaqueId, params));
return null;
}
});
if (log) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@SuppressLoggerChecks(reason = "safely delegates to logger")
@Override
public Void run() {
/**
* There should be only one threadContext (in prod env), @see DeprecationLogger#setThreadContext
*/
String opaqueId = getXOpaqueId(threadContexts);
logger.warn(new DeprecatedMessage(message, opaqueId, params));
return null;
}
});
}

}finally {
threadContextLock.readLock().unlock();
}
}

public String getXOpaqueId(Set<ThreadContext> threadContexts) {
return threadContexts.stream()
.filter(t -> t.isClosed() == false)
.filter(t -> t.getHeader(Task.X_OPAQUE_ID) != null)
.findFirst()
.map(t -> t.getHeader(Task.X_OPAQUE_ID))
.orElse("");
threadContextLock.readLock().lock();
try {
for (ThreadContext threadContext : threadContexts) {
assert threadContext.isClosed() == false;
String header = threadContext.getHeader(Task.X_OPAQUE_ID);
if (header != null) {
return header;
}
}
return "";
} finally {
threadContextLock.readLock().unlock();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public ThreadContext(Settings settings) {
this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes();
}

/**
* In order to prevent accessing a closed ThreadContext in DeprecationLogger in tests,
* it should be removed from DeprecationLogger.THREAD_CONTEXT static set of ThreadContexts before being called.
* @see org.elasticsearch.common.logging.DeprecationLogger#removeThreadContext(ThreadContext)
*/
@Override
public void close() {
threadLocal.close();
Expand Down
4 changes: 0 additions & 4 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.elasticsearch.common.inject.ModulesBuilder;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.NodeAndClusterIdStateListener;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkModule;
Expand Down Expand Up @@ -329,9 +328,6 @@ protected Node(

final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
// adds the context to the DeprecationLogger so that it does not need to be injected everywhere
DeprecationLogger.setThreadContext(threadPool.getThreadContext());
resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));

final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
Expand Down Expand Up @@ -218,6 +219,9 @@ public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBui
TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.cachedTimeThread.start();

// adds the context to the DeprecationLogger so that it does not need to be injected everywhere
DeprecationLogger.setThreadContext(threadContext);
}

/**
Expand Down Expand Up @@ -733,6 +737,7 @@ private static boolean awaitTermination(

@Override
public void close() {
DeprecationLogger.removeThreadContext(threadContext);
threadContext.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import java.security.PrivilegedAction;
import java.security.ProtectionDomain;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -197,19 +196,6 @@ public void testCanRemoveThreadContext() throws IOException {
}
}

public void testIgnoresClosedThreadContext() throws IOException {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing as we don't expect closed contexts to be present in DeprecatedLogger. It should be removed before closing

ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Set<ThreadContext> threadContexts = new HashSet<>(1);

threadContexts.add(threadContext);

threadContext.close();

logger.deprecated(threadContexts, "Ignored logger message");

assertTrue(threadContexts.contains(threadContext));
}

public void testSafeWithoutThreadContext() {
logger.deprecated(Collections.emptySet(), "Ignored");
}
Expand All @@ -231,9 +217,9 @@ public void testFailsWhenDoubleSettingSameThreadContext() throws IOException {
}
}

public void testFailsWhenRemovingUnknownThreadContext() throws IOException {
public void testDoNotFailWhenRemovingUnknownThreadContext() throws IOException {
try (ThreadContext threadContext = new ThreadContext(Settings.EMPTY)) {
expectThrows(IllegalStateException.class, () -> DeprecationLogger.removeThreadContext(threadContext));
assertFalse(DeprecationLogger.removeThreadContext(threadContext));
}
}

Expand Down