-
Notifications
You must be signed in to change notification settings - Fork 25.7k
Add settings to control size and count of warning headers in responses #28427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
9a3f8ad
0fa44fb
48a5768
63dc6b8
e2310f0
9e6de75
4e2fc02
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,10 +23,16 @@ | |
| import org.elasticsearch.common.io.stream.StreamInput; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.common.io.stream.Writeable; | ||
| import org.elasticsearch.common.logging.DeprecationLogger; | ||
| import org.elasticsearch.common.logging.ESLoggerFactory; | ||
| import org.elasticsearch.common.settings.Setting; | ||
| import org.elasticsearch.common.settings.Setting.Property; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.common.unit.ByteSizeValue; | ||
| import org.elasticsearch.http.HttpTransportSettings; | ||
|
|
||
| import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT; | ||
| import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE; | ||
|
|
||
| import java.io.Closeable; | ||
| import java.io.IOException; | ||
|
|
@@ -39,13 +45,14 @@ | |
| import java.util.Set; | ||
| import java.util.concurrent.CancellationException; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.FutureTask; | ||
| import java.util.concurrent.RunnableFuture; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.function.Function; | ||
| import java.util.function.Supplier; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
| import java.nio.charset.StandardCharsets; | ||
|
|
||
|
|
||
| /** | ||
| * A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with | ||
|
|
@@ -81,6 +88,8 @@ public final class ThreadContext implements Closeable, Writeable { | |
| private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct(); | ||
| private final Map<String, String> defaultHeader; | ||
| private final ContextThreadLocal threadLocal; | ||
| private final int maxWarningHeaderCount; | ||
| private final long maxWarningHeaderSize; | ||
|
|
||
| /** | ||
| * Creates a new ThreadContext instance | ||
|
|
@@ -98,6 +107,8 @@ public ThreadContext(Settings settings) { | |
| this.defaultHeader = Collections.unmodifiableMap(defaultHeader); | ||
| } | ||
| threadLocal = new ContextThreadLocal(); | ||
| this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings); | ||
| this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -282,7 +293,7 @@ public void addResponseHeader(final String key, final String value) { | |
| * @param uniqueValue the function that produces de-duplication values | ||
| */ | ||
| public void addResponseHeader(final String key, final String value, final Function<String, String> uniqueValue) { | ||
| threadLocal.set(threadLocal.get().putResponse(key, value, uniqueValue)); | ||
| threadLocal.set(threadLocal.get().putResponse(key, value, uniqueValue, maxWarningHeaderCount, maxWarningHeaderSize)); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -359,7 +370,8 @@ private static final class ThreadContextStruct { | |
| private final Map<String, Object> transientHeaders; | ||
| private final Map<String, List<String>> responseHeaders; | ||
| private final boolean isSystemContext; | ||
|
|
||
| private long warningHeadersSize; //saving current warning headers' size not to recalculate the size with every new warning header | ||
| private boolean isWarningLimitReached; | ||
| private ThreadContextStruct(StreamInput in) throws IOException { | ||
| final int numRequest = in.readVInt(); | ||
| Map<String, String> requestHeaders = numRequest == 0 ? Collections.emptyMap() : new HashMap<>(numRequest); | ||
|
|
@@ -371,6 +383,8 @@ private ThreadContextStruct(StreamInput in) throws IOException { | |
| this.responseHeaders = in.readMapOfLists(StreamInput::readString, StreamInput::readString); | ||
| this.transientHeaders = Collections.emptyMap(); | ||
| isSystemContext = false; // we never serialize this it's a transient flag | ||
| this.warningHeadersSize = 0L; | ||
| this.isWarningLimitReached = false; | ||
| } | ||
|
|
||
| private ThreadContextStruct setSystemContext() { | ||
|
|
@@ -387,6 +401,20 @@ private ThreadContextStruct(Map<String, String> requestHeaders, | |
| this.responseHeaders = responseHeaders; | ||
| this.transientHeaders = transientHeaders; | ||
| this.isSystemContext = isSystemContext; | ||
| this.warningHeadersSize = 0L; | ||
| this.isWarningLimitReached = false; | ||
| } | ||
|
|
||
| private ThreadContextStruct(Map<String, String> requestHeaders, | ||
| Map<String, List<String>> responseHeaders, | ||
| Map<String, Object> transientHeaders, boolean isSystemContext, | ||
| long warningHeadersSize, boolean isWarningLimitReached) { | ||
| this.requestHeaders = requestHeaders; | ||
| this.responseHeaders = responseHeaders; | ||
| this.transientHeaders = transientHeaders; | ||
| this.isSystemContext = isSystemContext; | ||
| this.warningHeadersSize = warningHeadersSize; | ||
| this.isWarningLimitReached = isWarningLimitReached; | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -440,30 +468,59 @@ private ThreadContextStruct putResponseHeaders(Map<String, List<String>> headers | |
| return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext); | ||
| } | ||
|
|
||
| private ThreadContextStruct putResponse(final String key, final String value, final Function<String, String> uniqueValue) { | ||
| private ThreadContextStruct putResponse(final String key, final String value, final Function<String, String> uniqueValue, | ||
| final int maxWarningHeaderCount, final long maxWarningHeaderSize) { | ||
| assert value != null; | ||
| long newWarningHeaderSize = warningHeadersSize; | ||
| //check if we can add another warning header - if max size within limits | ||
| if (key.equals("Warning")) { | ||
| if (isWarningLimitReached) return this; // can't add warning headers - limit reached | ||
| newWarningHeaderSize += "Warning".getBytes(StandardCharsets.UTF_8).length + value.getBytes(StandardCharsets.UTF_8).length; | ||
| //if size is NOT unbounded AND limit is exceeded | ||
| if ((maxWarningHeaderSize != -1) && (newWarningHeaderSize > maxWarningHeaderSize)) { | ||
| logWarningsLimitReached(); | ||
| return new ThreadContextStruct(requestHeaders, responseHeaders, transientHeaders, | ||
| isSystemContext, newWarningHeaderSize, true); | ||
| } | ||
| } | ||
|
|
||
| final Map<String, List<String>> newResponseHeaders = new HashMap<>(this.responseHeaders); | ||
| final List<String> existingValues = newResponseHeaders.get(key); | ||
|
|
||
| if (existingValues != null) { | ||
| final Set<String> existingUniqueValues = existingValues.stream().map(uniqueValue).collect(Collectors.toSet()); | ||
| assert existingValues.size() == existingUniqueValues.size(); | ||
| if (existingUniqueValues.contains(uniqueValue.apply(value))) { | ||
| return this; | ||
| } | ||
|
|
||
| final List<String> newValues = new ArrayList<>(existingValues); | ||
| newValues.add(value); | ||
|
|
||
| newResponseHeaders.put(key, Collections.unmodifiableList(newValues)); | ||
| } else { | ||
| newResponseHeaders.put(key, Collections.singletonList(value)); | ||
| } | ||
|
|
||
| return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext); | ||
| //check if we can add another warning header - if max count within limits | ||
| if ((key.equals("Warning")) && (maxWarningHeaderCount != -1)) { //if count is NOT unbounded, check its limits | ||
| final int warningHeaderCount = newResponseHeaders.containsKey("Warning") ? newResponseHeaders.get("Warning").size() : 0; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there is a problem here. Imagine that |
||
| if (warningHeaderCount > maxWarningHeaderCount) { | ||
| logWarningsLimitReached(); | ||
| return new ThreadContextStruct(requestHeaders, responseHeaders, transientHeaders, | ||
| isSystemContext, newWarningHeaderSize, true); | ||
| } | ||
| } | ||
| return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, | ||
| isSystemContext, newWarningHeaderSize, isWarningLimitReached); | ||
| } | ||
|
|
||
|
|
||
| private void logWarningsLimitReached() { | ||
| final String message = "There were more warnings, but they were dropped as [" + | ||
| HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT.getKey() + "] or [" + | ||
| HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE.getKey() + "] were reached!"; | ||
| ESLoggerFactory.getLogger(ThreadContext.class).warn(message); | ||
| } | ||
|
|
||
|
|
||
| private ThreadContextStruct putTransient(String key, Object value) { | ||
| Map<String, Object> newTransient = new HashMap<>(this.transientHeaders); | ||
| if (newTransient.putIfAbsent(key, value) != null) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we want to log each time that we drop a warning header, not only the first time for a given request. Also we can be more precise than the current implementation which says one or the other condition is met, but we always know exactly which condition it is so we can help the user more by letting them know.