diff --git a/docs/changelog/68649.yaml b/docs/changelog/68649.yaml new file mode 100644 index 0000000000000..b04a4d7a64eb9 --- /dev/null +++ b/docs/changelog/68649.yaml @@ -0,0 +1,6 @@ +pr: 68649 +summary: Prevent `ThreadContext` header leak when sending response +area: Infra/Core +type: bug +issues: + - 68278 diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java index 9a592472d1b3a..40c98bf20b609 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequestHandler.java @@ -14,6 +14,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.http.HttpPipelinedRequest; +import org.elasticsearch.transport.Transports; @ChannelHandler.Sharable class Netty4HttpRequestHandler extends SimpleChannelInboundHandler { @@ -26,6 +27,8 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler queuedWrites = new ArrayDeque<>(); + private final ThreadContext threadContext; private WriteOperation currentWrite; - public Netty4WriteThrottlingHandler() {} + public Netty4WriteThrottlingHandler(ThreadContext threadContext) { + this.threadContext = threadContext; + } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { assert msg instanceof ByteBuf; + assert Transports.assertDefaultThreadContext(threadContext); + assert Transports.assertTransportThread(); final boolean queued = queuedWrites.offer(new WriteOperation((ByteBuf) msg, promise)); assert queued; } diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index 756fcc668788c..7deb9aaf19b0f 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -492,4 +492,8 @@ private static ActionListener earlyResponseListener(HttpRequest request, H return ActionListener.noop(); } } + + public ThreadPool getThreadPool() { + return threadPool; + } } diff --git a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java index 7e5b8b2c63d8b..6f82a90fb9911 100644 --- a/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java +++ b/server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java @@ -131,7 +131,9 @@ public void sendResponse(RestResponse restResponse) { addCookies(httpResponse); ActionListener listener = ActionListener.wrap(() -> Releasables.close(toClose)); - httpChannel.sendResponse(httpResponse, listener); + try (ThreadContext.StoredContext existing = threadContext.stashContext()) { + httpChannel.sendResponse(httpResponse, listener); + } success = true; } finally { if (success == false) {