diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java index 1143e7b67f77a..71fbfd5f2478a 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java @@ -34,9 +34,6 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.logging.ByteBufFormat; -import io.netty.handler.logging.LogLevel; -import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedStream; import io.netty.handler.stream.ChunkedWriteHandler; @@ -49,7 +46,6 @@ import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.collect.Iterators; @@ -77,7 +73,6 @@ import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.MockLog; -import org.elasticsearch.test.junit.annotations.TestIssueLogging; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.rest.ObjectPath; import org.elasticsearch.transport.Transports; @@ -528,13 +523,8 @@ private void assertHttpBodyLogging(Consumer test) throws Exceptio } } - @TestIssueLogging( - issueUrl = "https://github.com/elastic/elasticsearch/issues/144579", - value = "org.elasticsearch.http.netty4.Netty4IncrementalRequestHandlingIT:DEBUG" - + ",org.elasticsearch.transport.TransportService.tracer:TRACE" - ) public void testBulkIndexingRequestSplitting() throws Exception { - final var watermarkBytes = between(100, 200); + final var watermarkBytes = between(100, 2000); final var tinyNode = internalCluster().startCoordinatingOnlyNode( Settings.builder() .put(IndexingPressure.SPLIT_BULK_LOW_WATERMARK.getKey(), ByteSizeValue.ofBytes(watermarkBytes)) @@ -551,14 +541,6 @@ public void testBulkIndexingRequestSplitting() throws Exception { channel.writeAndFlush(request); final var indexName = randomIndexName(); - final var clusterStateLoggingListener = ClusterServiceUtils.addTemporaryStateListener( - internalCluster().getCurrentMasterNodeInstance(ClusterService.class), - cs -> { - logger.info("cluster state: {}", cs); - return false; - }, - TimeValue.ONE_HOUR - ); final var indexCreatedListener = ClusterServiceUtils.addTemporaryStateListener( cs -> Iterators.filter( cs.metadata().indicesAllProjects().iterator(), @@ -570,24 +552,28 @@ public void testBulkIndexingRequestSplitting() throws Exception { final var valueLength = between(10, 30); final var docSizeBytes = "{'field':''}".length() + valueLength; - final var itemCount = between(watermarkBytes / docSizeBytes + 1, 300); // enough to split at least once + final var minItemCount = watermarkBytes / docSizeBytes + 1; + final var itemCount = between(minItemCount, minItemCount * 2); // enough to split at least once assertThat(itemCount * docSizeBytes, greaterThan(watermarkBytes)); for (int i = 0; i < itemCount; i++) { channel.write(new DefaultHttpContent(Unpooled.wrappedBuffer(Strings.format(""" {"index":{"_index":"%s"}} {"field":"%s"} """, indexName, randomAlphaOfLength(valueLength)).getBytes(StandardCharsets.UTF_8)))); + + if (i == minItemCount && randomBoolean()) { + channel.flush(); + if (randomBoolean()) { + safeAwait(indexCreatedListener); + } + } } channel.flush(); safeAwait(indexCreatedListener); // index must be created before we finish sending the request - logger.info("--> completing request"); channel.writeAndFlush(new DefaultLastHttpContent()); - logger.info("--> awaiting response"); final var response = clientContext.getNextResponse(); - logger.info("--> received response"); - clusterStateLoggingListener.onResponse(null); try { assertEquals(RestStatus.OK.getStatus(), response.status().code()); final ObjectPath responseBody; @@ -667,9 +653,6 @@ private ClientContext newClientContext(String nodeName, Consumer exce @Override protected void initChannel(SocketChannel ch) { var p = ch.pipeline(); - if (logger.isDebugEnabled()) { - p.addLast(new LoggingHandler(Netty4IncrementalRequestHandlingIT.class, LogLevel.DEBUG, ByteBufFormat.HEX_DUMP)); - } p.addLast(new HttpClientCodec()); p.addLast(new HttpObjectAggregator(ByteSizeUnit.MB.toIntBytes(4))); p.addLast(new SimpleChannelInboundHandler() {