diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java index b2e793ba0abc0..2b7786fdb1d70 100644 --- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java +++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java @@ -164,10 +164,6 @@ private void registerBuiltinClusterSettings() { registerClusterDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME); registerClusterDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT, Validator.TIME); registerClusterDynamicSetting(MetaData.SETTING_READ_ONLY, Validator.EMPTY); - registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, Validator.POSITIVE_BYTES_SIZE); - registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, Validator.INTEGER); - registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, Validator.BYTES_SIZE); - registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_COMPRESS, Validator.EMPTY); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, Validator.POSITIVE_INTEGER); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, Validator.POSITIVE_INTEGER); registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE); diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java index 68fa6e45d8802..751f8a09ea5ea 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java @@ -746,8 +746,6 @@ public static Builder builder(MetaData metaData) { /** All known byte-sized cluster settings. */ public static final Set CLUSTER_BYTES_SIZE_SETTINGS = unmodifiableSet(newHashSet( IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, - RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, - RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC)); diff --git a/core/src/main/java/org/elasticsearch/common/io/Streams.java b/core/src/main/java/org/elasticsearch/common/io/Streams.java index 8adf0919e5019..36b1d9445b00f 100644 --- a/core/src/main/java/org/elasticsearch/common/io/Streams.java +++ b/core/src/main/java/org/elasticsearch/common/io/Streams.java @@ -20,6 +20,8 @@ package org.elasticsearch.common.io; import java.nio.charset.StandardCharsets; + +import org.apache.lucene.util.IOUtils; import org.elasticsearch.common.util.Callback; import java.io.BufferedReader; @@ -68,6 +70,7 @@ public static long copy(InputStream in, OutputStream out) throws IOException { public static long copy(InputStream in, OutputStream out, byte[] buffer) throws IOException { Objects.requireNonNull(in, "No InputStream specified"); Objects.requireNonNull(out, "No OutputStream specified"); + boolean success = false; try { long byteCount = 0; int bytesRead; @@ -76,17 +79,13 @@ public static long copy(InputStream in, OutputStream out, byte[] buffer) throws byteCount += bytesRead; } out.flush(); + success = true; return byteCount; } finally { - try { - in.close(); - } catch (IOException ex) { - // do nothing - } - try { - out.close(); - } catch (IOException ex) { - // do nothing + if (success) { + IOUtils.close(in, out); + } else { + IOUtils.closeWhileHandlingException(in, out); } } } @@ -130,6 +129,7 @@ public static void copy(byte[] in, OutputStream out) throws IOException { public static int copy(Reader in, Writer out) throws IOException { Objects.requireNonNull(in, "No Reader specified"); Objects.requireNonNull(out, "No Writer specified"); + boolean success = false; try { int byteCount = 0; char[] buffer = new char[BUFFER_SIZE]; @@ -139,17 +139,13 @@ public static int copy(Reader in, Writer out) throws IOException { byteCount += bytesRead; } out.flush(); + success = true; return byteCount; } finally { - try { - in.close(); - } catch (IOException ex) { - // do nothing - } - try { - out.close(); - } catch (IOException ex) { - // do nothing + if (success) { + IOUtils.close(in, out); + } else { + IOUtils.closeWhileHandlingException(in, out); } } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 749ba4f336058..6db38d59e857d 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -40,10 +40,6 @@ */ public class RecoverySettings extends AbstractComponent implements Closeable { - public static final String INDICES_RECOVERY_FILE_CHUNK_SIZE = "indices.recovery.file_chunk_size"; - public static final String INDICES_RECOVERY_TRANSLOG_OPS = "indices.recovery.translog_ops"; - public static final String INDICES_RECOVERY_TRANSLOG_SIZE = "indices.recovery.translog_size"; - public static final String INDICES_RECOVERY_COMPRESS = "indices.recovery.compress"; public static final String INDICES_RECOVERY_CONCURRENT_STREAMS = "indices.recovery.concurrent_streams"; public static final String INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS = "indices.recovery.concurrent_small_file_streams"; public static final String INDICES_RECOVERY_MAX_BYTES_PER_SEC = "indices.recovery.max_bytes_per_sec"; @@ -75,11 +71,7 @@ public class RecoverySettings extends AbstractComponent implements Closeable { public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb", "SMALL_FILE_CUTOFF_BYTES").bytes(); - private volatile ByteSizeValue fileChunkSize; - - private volatile boolean compress; - private volatile int translogOps; - private volatile ByteSizeValue translogSize; + public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); private volatile int concurrentStreams; private volatile int concurrentSmallFileStreams; @@ -94,16 +86,12 @@ public class RecoverySettings extends AbstractComponent implements Closeable { private volatile TimeValue internalActionTimeout; private volatile TimeValue internalActionLongTimeout; + private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; @Inject public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) { super(settings); - this.fileChunkSize = settings.getAsBytesSize(INDICES_RECOVERY_FILE_CHUNK_SIZE, new ByteSizeValue(512, ByteSizeUnit.KB)); - this.translogOps = settings.getAsInt(INDICES_RECOVERY_TRANSLOG_OPS, 1000); - this.translogSize = settings.getAsBytesSize(INDICES_RECOVERY_TRANSLOG_SIZE, new ByteSizeValue(512, ByteSizeUnit.KB)); - this.compress = settings.getAsBoolean(INDICES_RECOVERY_COMPRESS, true); - this.retryDelayStateSync = settings.getAsTime(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(500)); // doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes) // and we want to give the master time to remove a faulty node @@ -132,8 +120,8 @@ public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsServi rateLimiter = new SimpleRateLimiter(maxBytesPerSec.mbFrac()); } - logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]", - maxBytesPerSec, concurrentStreams, fileChunkSize, translogSize, translogOps, compress); + logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}]", + maxBytesPerSec, concurrentStreams); nodeSettingsService.addListener(new ApplySettings()); } @@ -144,26 +132,6 @@ public void close() { ThreadPool.terminate(concurrentSmallFileStreamPool, 1, TimeUnit.SECONDS); } - public ByteSizeValue fileChunkSize() { - return fileChunkSize; - } - - public boolean compress() { - return compress; - } - - public int translogOps() { - return translogOps; - } - - public ByteSizeValue translogSize() { - return translogSize; - } - - public int concurrentStreams() { - return concurrentStreams; - } - public ThreadPoolExecutor concurrentStreamPool() { return concurrentStreamPool; } @@ -196,6 +164,15 @@ public TimeValue internalActionLongTimeout() { return internalActionLongTimeout; } + public ByteSizeValue getChunkSize() { return chunkSize; } + + void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests + if (chunkSize.bytesAsInt() <= 0) { + throw new IllegalArgumentException("chunkSize must be > 0"); + } + this.chunkSize = chunkSize; + } + class ApplySettings implements NodeSettingsService.Listener { @Override @@ -213,30 +190,6 @@ public void onRefreshSettings(Settings settings) { } } - ByteSizeValue fileChunkSize = settings.getAsBytesSize(INDICES_RECOVERY_FILE_CHUNK_SIZE, RecoverySettings.this.fileChunkSize); - if (!fileChunkSize.equals(RecoverySettings.this.fileChunkSize)) { - logger.info("updating [indices.recovery.file_chunk_size] from [{}] to [{}]", RecoverySettings.this.fileChunkSize, fileChunkSize); - RecoverySettings.this.fileChunkSize = fileChunkSize; - } - - int translogOps = settings.getAsInt(INDICES_RECOVERY_TRANSLOG_OPS, RecoverySettings.this.translogOps); - if (translogOps != RecoverySettings.this.translogOps) { - logger.info("updating [indices.recovery.translog_ops] from [{}] to [{}]", RecoverySettings.this.translogOps, translogOps); - RecoverySettings.this.translogOps = translogOps; - } - - ByteSizeValue translogSize = settings.getAsBytesSize(INDICES_RECOVERY_TRANSLOG_SIZE, RecoverySettings.this.translogSize); - if (!translogSize.equals(RecoverySettings.this.translogSize)) { - logger.info("updating [indices.recovery.translog_size] from [{}] to [{}]", RecoverySettings.this.translogSize, translogSize); - RecoverySettings.this.translogSize = translogSize; - } - - boolean compress = settings.getAsBoolean(INDICES_RECOVERY_COMPRESS, RecoverySettings.this.compress); - if (compress != RecoverySettings.this.compress) { - logger.info("updating [indices.recovery.compress] from [{}] to [{}]", RecoverySettings.this.compress, compress); - RecoverySettings.this.compress = compress; - } - int concurrentStreams = settings.getAsInt(INDICES_RECOVERY_CONCURRENT_STREAMS, RecoverySettings.this.concurrentStreams); if (concurrentStreams != RecoverySettings.this.concurrentStreams) { logger.info("updating [indices.recovery.concurrent_streams] from [{}] to [{}]", RecoverySettings.this.concurrentStreams, concurrentStreams); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 73193161d12dd..4057af0084114 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -36,7 +36,9 @@ import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.store.InputStreamIndexInput; +import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.CancellableThreads.Interruptable; import org.elasticsearch.index.engine.RecoveryEngineException; @@ -49,6 +51,7 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; @@ -57,6 +60,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.StreamSupport; @@ -77,9 +81,9 @@ public class RecoverySourceHandler { private final StartRecoveryRequest request; private final RecoverySettings recoverySettings; private final TransportService transportService; + private final int chunkSizeInBytes; protected final RecoveryResponse response; - private final TransportRequestOptions requestOptions; private final CancellableThreads cancellableThreads = new CancellableThreads() { @Override @@ -106,14 +110,8 @@ public RecoverySourceHandler(final IndexShard shard, final StartRecoveryRequest this.transportService = transportService; this.indexName = this.request.shardId().index().name(); this.shardId = this.request.shardId().id(); - + this.chunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt(); this.response = new RecoveryResponse(); - this.requestOptions = TransportRequestOptions.builder() - .withCompress(recoverySettings.compress()) - .withType(TransportRequestOptions.Type.RECOVERY) - .withTimeout(recoverySettings.internalActionTimeout()) - .build(); - } /** @@ -218,7 +216,7 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView) totalSize += md.length(); } List phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size()); - phase1Files.addAll(diff.different); + phase1Files.addAll(diff.different); phase1Files.addAll(diff.missing); for (StoreFileMetaData md : phase1Files) { if (request.metadataSnapshot().asMap().containsKey(md.name())) { @@ -249,7 +247,7 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView) }); // How many bytes we've copied since we last called RateLimiter.pause final AtomicLong bytesSinceLastPause = new AtomicLong(); - final Function outputStreamFactories = (md) -> new RecoveryOutputStream(md, bytesSinceLastPause, translogView); + final Function outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), chunkSizeInBytes); sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories); cancellableThreads.execute(() -> { // Send the CLEAN_FILES request, which takes all of the files that @@ -432,7 +430,7 @@ protected int sendSnapshot(final Translog.Snapshot snapshot) { } final TransportRequestOptions recoveryOptions = TransportRequestOptions.builder() - .withCompress(recoverySettings.compress()) + .withCompress(true) .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionLongTimeout()) .build(); @@ -451,9 +449,9 @@ protected int sendSnapshot(final Translog.Snapshot snapshot) { size += operation.estimateSize(); totalOperations++; - // Check if this request is past the size or bytes threshold, and + // Check if this request is past bytes threshold, and // if so, send it off - if (ops >= recoverySettings.translogOps() || size >= recoverySettings.translogSize().bytes()) { + if (size >= chunkSizeInBytes) { // don't throttle translog, since we lock for phase3 indexing, // so we need to move it as fast as possible. Note, since we @@ -537,7 +535,7 @@ final class RecoveryOutputStream extends OutputStream { @Override public final void write(int b) throws IOException { - write(new byte[]{(byte) b}, 0, 1); + throw new UnsupportedOperationException("we can't send single bytes over the wire"); } @Override @@ -548,6 +546,11 @@ public final void write(byte[] b, int offset, int length) throws IOException { } private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException { + final TransportRequestOptions chunkSendOptions = TransportRequestOptions.builder() + .withCompress(false) // lucene files are already compressed and therefore compressing this won't really help much so we are safing the cpu for other things + .withType(TransportRequestOptions.Type.RECOVERY) + .withTimeout(recoverySettings.internalActionTimeout()) + .build(); cancellableThreads.execute(() -> { // Pause using the rate limiter, if desired, to throttle the recovery final long throttleTimeInNanos; @@ -577,7 +580,7 @@ private void sendNextChunk(long position, BytesArray content, boolean lastChunk) * see how many translog ops we accumulate while copying files across the network. A future optimization * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. */ - throttleTimeInNanos), requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + throttleTimeInNanos), chunkSendOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); }); if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us throw new IndexShardClosedException(request.shardId()); @@ -670,9 +673,10 @@ Future[] asyncSendFiles(Store store, StoreFileMetaData[] files, Function future = pool.submit(() -> { - try (final OutputStream outputStream = outputStreamFactory.apply(md); - final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) { - Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStream); + try (final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) { + // it's fine that we are only having the indexInput int he try/with block. The copy methods handles + // exceptions during close correctly and doesn't hide the original exception. + Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStreamFactory.apply(md)); } return null; }); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 5ece413f7967b..4cf60289a18aa 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -138,21 +138,25 @@ private void assertOnGoingRecoveryState(RecoveryState state, int shardId, Type t } private void slowDownRecovery(ByteSizeValue shardSize) { - long chunkSize = shardSize.bytes() / 10; + long chunkSize = Math.max(1, shardSize.bytes() / 10); + for(RecoverySettings settings : internalCluster().getInstances(RecoverySettings.class)) { + setChunkSize(settings, new ByteSizeValue(chunkSize, ByteSizeUnit.BYTES)); + } assertTrue(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder() // one chunk per sec.. .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, chunkSize, ByteSizeUnit.BYTES) - .put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, chunkSize, ByteSizeUnit.BYTES) ) .get().isAcknowledged()); } private void restoreRecoverySpeed() { + for(RecoverySettings settings : internalCluster().getInstances(RecoverySettings.class)) { + setChunkSize(settings, RecoverySettings.DEFAULT_CHUNK_SIZE); + } assertTrue(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder() .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, "20mb") - .put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, "512kb") ) .get().isAcknowledged()); } @@ -631,4 +635,8 @@ public void sendRequest(DiscoveryNode node, long requestId, String action, Trans transport.sendRequest(node, requestId, action, request, options); } } + + public static void setChunkSize(RecoverySettings recoverySettings, ByteSizeValue chunksSize) { + recoverySettings.setChunkSize(chunksSize); + } } diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java b/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java index b0288042eccf0..4bcbb8c8ee722 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoverySettingsTests.java @@ -32,24 +32,6 @@ protected boolean resetNodeAfterTest() { } public void testAllSettingsAreDynamicallyUpdatable() { - innerTestSettings(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, randomIntBetween(1, 200), ByteSizeUnit.BYTES, new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, int expectedValue) { - assertEquals(expectedValue, recoverySettings.fileChunkSize().bytesAsInt()); - } - }); - innerTestSettings(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, randomIntBetween(1, 200), new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, int expectedValue) { - assertEquals(expectedValue, recoverySettings.translogOps()); - } - }); - innerTestSettings(RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, randomIntBetween(1, 200), ByteSizeUnit.BYTES, new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, int expectedValue) { - assertEquals(expectedValue, recoverySettings.translogSize().bytesAsInt()); - } - }); innerTestSettings(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, randomIntBetween(1, 200), new Validator() { @Override public void validate(RecoverySettings recoverySettings, int expectedValue) { @@ -98,13 +80,6 @@ public void validate(RecoverySettings recoverySettings, int expectedValue) { assertEquals(expectedValue, recoverySettings.internalActionLongTimeout().millis()); } }); - - innerTestSettings(RecoverySettings.INDICES_RECOVERY_COMPRESS, false, new Validator() { - @Override - public void validate(RecoverySettings recoverySettings, boolean expectedValue) { - assertEquals(expectedValue, recoverySettings.compress()); - } - }); } private static class Validator { diff --git a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java index d94f72ea80ff6..4d111469505fb 100644 --- a/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/TruncatedRecoveryIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.indices.recovery.IndexRecoveryIT; import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryTarget; @@ -58,13 +59,6 @@ @ESIntegTestCase.ClusterScope(numDataNodes = 2, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST) @SuppressCodecs("*") // test relies on exact file extensions public class TruncatedRecoveryIT extends ESIntegTestCase { - @Override - protected Settings nodeSettings(int nodeOrdinal) { - Settings.Builder builder = Settings.builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES)); - return builder.build(); - } @Override protected Collection> nodePlugins() { @@ -78,6 +72,10 @@ protected Collection> nodePlugins() { * Later we allow full recovery to ensure we can still recover and don't run into corruptions. */ public void testCancelRecoveryAndResume() throws Exception { + for(RecoverySettings settings : internalCluster().getInstances(RecoverySettings.class)) { + IndexRecoveryIT.setChunkSize(settings, new ByteSizeValue(randomIntBetween(50, 300), ByteSizeUnit.BYTES)); + } + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get(); List dataNodeStats = new ArrayList<>(); for (NodeStats stat : nodeStats.getNodes()) { diff --git a/test-framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java b/test-framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java index 47e163a6291b6..3e5c903a1ba9f 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java +++ b/test-framework/src/main/java/org/elasticsearch/test/ESBackcompatTestCase.java @@ -131,16 +131,6 @@ private static Path backwardsCompatibilityPath() { return file; } - @Override - protected Settings.Builder setRandomIndexSettings(Random random, Settings.Builder builder) { - if (globalCompatibilityVersion().before(Version.V_1_3_2)) { - // if we test against nodes before 1.3.2 we disable all the compression due to a known bug - // see #7210 - builder.put(RecoverySettings.INDICES_RECOVERY_COMPRESS, false); - } - return builder; - } - /** * Retruns the tests compatibility version. */ @@ -250,13 +240,6 @@ protected Settings commonNodeSettings(int nodeOrdinal) { Settings.Builder builder = Settings.builder().put(requiredSettings()); builder.put(TransportModule.TRANSPORT_TYPE_KEY, "netty"); // run same transport / disco as external builder.put("node.mode", "network"); - - if (compatibilityVersion().before(Version.V_1_3_2)) { - // if we test against nodes before 1.3.2 we disable all the compression due to a known bug - // see #7210 - builder.put(Transport.TransportSettings.TRANSPORT_TCP_COMPRESS, false) - .put(RecoverySettings.INDICES_RECOVERY_COMPRESS, false); - } return builder.build(); } diff --git a/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index b86c868969904..7ae3226b66a7a 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test-framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -447,10 +447,6 @@ private Settings getRandomNodeSettings(long seed) { } } - if (random.nextBoolean()) { - builder.put(RecoverySettings.INDICES_RECOVERY_COMPRESS, random.nextBoolean()); - } - if (random.nextBoolean()) { builder.put(NettyTransport.PING_SCHEDULE, RandomInts.randomIntBetween(random, 100, 2000) + "ms"); } @@ -1554,7 +1550,7 @@ public synchronized Async> startNodesAsync(final int numNodes, fina for (int i = 0; i < numNodes; i++) { asyncs.add(startNodeAsync(settings, version)); } - + return () -> { List ids = new ArrayList<>(); for (Async async : asyncs) {