Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,6 @@ private void registerBuiltinClusterSettings() {
registerClusterDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME);
registerClusterDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT, Validator.TIME);
registerClusterDynamicSetting(MetaData.SETTING_READ_ONLY, Validator.EMPTY);
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE, Validator.POSITIVE_BYTES_SIZE);
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_OPS, Validator.INTEGER);
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE, Validator.BYTES_SIZE);
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_COMPRESS, Validator.EMPTY);
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_STREAMS, Validator.POSITIVE_INTEGER);
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, Validator.POSITIVE_INTEGER);
registerClusterDynamicSetting(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,8 +746,6 @@ public static Builder builder(MetaData metaData) {
/** All known byte-sized cluster settings. */
public static final Set<String> CLUSTER_BYTES_SIZE_SETTINGS = unmodifiableSet(newHashSet(
IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC,
RecoverySettings.INDICES_RECOVERY_FILE_CHUNK_SIZE,
RecoverySettings.INDICES_RECOVERY_TRANSLOG_SIZE,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC));


Expand Down
32 changes: 14 additions & 18 deletions core/src/main/java/org/elasticsearch/common/io/Streams.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.common.io;

import java.nio.charset.StandardCharsets;

import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.util.Callback;

import java.io.BufferedReader;
Expand Down Expand Up @@ -68,6 +70,7 @@ public static long copy(InputStream in, OutputStream out) throws IOException {
public static long copy(InputStream in, OutputStream out, byte[] buffer) throws IOException {
Objects.requireNonNull(in, "No InputStream specified");
Objects.requireNonNull(out, "No OutputStream specified");
boolean success = false;
try {
long byteCount = 0;
int bytesRead;
Expand All @@ -76,17 +79,13 @@ public static long copy(InputStream in, OutputStream out, byte[] buffer) throws
byteCount += bytesRead;
}
out.flush();
success = true;
return byteCount;
} finally {
try {
in.close();
} catch (IOException ex) {
// do nothing
}
try {
out.close();
} catch (IOException ex) {
// do nothing
if (success) {
IOUtils.close(in, out);
} else {
IOUtils.closeWhileHandlingException(in, out);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, cleaner.

}
}
}
Expand Down Expand Up @@ -130,6 +129,7 @@ public static void copy(byte[] in, OutputStream out) throws IOException {
public static int copy(Reader in, Writer out) throws IOException {
Objects.requireNonNull(in, "No Reader specified");
Objects.requireNonNull(out, "No Writer specified");
boolean success = false;
try {
int byteCount = 0;
char[] buffer = new char[BUFFER_SIZE];
Expand All @@ -139,17 +139,13 @@ public static int copy(Reader in, Writer out) throws IOException {
byteCount += bytesRead;
}
out.flush();
success = true;
return byteCount;
} finally {
try {
in.close();
} catch (IOException ex) {
// do nothing
}
try {
out.close();
} catch (IOException ex) {
// do nothing
if (success) {
IOUtils.close(in, out);
} else {
IOUtils.closeWhileHandlingException(in, out);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@
*/
public class RecoverySettings extends AbstractComponent implements Closeable {

public static final String INDICES_RECOVERY_FILE_CHUNK_SIZE = "indices.recovery.file_chunk_size";
public static final String INDICES_RECOVERY_TRANSLOG_OPS = "indices.recovery.translog_ops";
public static final String INDICES_RECOVERY_TRANSLOG_SIZE = "indices.recovery.translog_size";
public static final String INDICES_RECOVERY_COMPRESS = "indices.recovery.compress";
public static final String INDICES_RECOVERY_CONCURRENT_STREAMS = "indices.recovery.concurrent_streams";
public static final String INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS = "indices.recovery.concurrent_small_file_streams";
public static final String INDICES_RECOVERY_MAX_BYTES_PER_SEC = "indices.recovery.max_bytes_per_sec";
Expand Down Expand Up @@ -75,11 +71,7 @@ public class RecoverySettings extends AbstractComponent implements Closeable {

public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb", "SMALL_FILE_CUTOFF_BYTES").bytes();

private volatile ByteSizeValue fileChunkSize;

private volatile boolean compress;
private volatile int translogOps;
private volatile ByteSizeValue translogSize;
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB);

private volatile int concurrentStreams;
private volatile int concurrentSmallFileStreams;
Expand All @@ -94,16 +86,12 @@ public class RecoverySettings extends AbstractComponent implements Closeable {
private volatile TimeValue internalActionTimeout;
private volatile TimeValue internalActionLongTimeout;

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comment about what this is not final (i.e., for testing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the setter has a comment


@Inject
public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);

this.fileChunkSize = settings.getAsBytesSize(INDICES_RECOVERY_FILE_CHUNK_SIZE, new ByteSizeValue(512, ByteSizeUnit.KB));
this.translogOps = settings.getAsInt(INDICES_RECOVERY_TRANSLOG_OPS, 1000);
this.translogSize = settings.getAsBytesSize(INDICES_RECOVERY_TRANSLOG_SIZE, new ByteSizeValue(512, ByteSizeUnit.KB));
this.compress = settings.getAsBoolean(INDICES_RECOVERY_COMPRESS, true);

this.retryDelayStateSync = settings.getAsTime(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC, TimeValue.timeValueMillis(500));
// doesn't have to be fast as nodes are reconnected every 10s by default (see InternalClusterService.ReconnectToNodes)
// and we want to give the master time to remove a faulty node
Expand Down Expand Up @@ -132,8 +120,8 @@ public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsServi
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.mbFrac());
}

logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]",
maxBytesPerSec, concurrentStreams, fileChunkSize, translogSize, translogOps, compress);
logger.debug("using max_bytes_per_sec[{}], concurrent_streams [{}]",
maxBytesPerSec, concurrentStreams);

nodeSettingsService.addListener(new ApplySettings());
}
Expand All @@ -144,26 +132,6 @@ public void close() {
ThreadPool.terminate(concurrentSmallFileStreamPool, 1, TimeUnit.SECONDS);
}

public ByteSizeValue fileChunkSize() {
return fileChunkSize;
}

public boolean compress() {
return compress;
}

public int translogOps() {
return translogOps;
}

public ByteSizeValue translogSize() {
return translogSize;
}

public int concurrentStreams() {
return concurrentStreams;
}

public ThreadPoolExecutor concurrentStreamPool() {
return concurrentStreamPool;
}
Expand Down Expand Up @@ -196,6 +164,15 @@ public TimeValue internalActionLongTimeout() {
return internalActionLongTimeout;
}

public ByteSizeValue getChunkSize() { return chunkSize; }

void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests
if (chunkSize.bytesAsInt() <= 0) {
throw new IllegalArgumentException("chunkSize must be > 0");
}
this.chunkSize = chunkSize;
}


class ApplySettings implements NodeSettingsService.Listener {
@Override
Expand All @@ -213,30 +190,6 @@ public void onRefreshSettings(Settings settings) {
}
}

ByteSizeValue fileChunkSize = settings.getAsBytesSize(INDICES_RECOVERY_FILE_CHUNK_SIZE, RecoverySettings.this.fileChunkSize);
if (!fileChunkSize.equals(RecoverySettings.this.fileChunkSize)) {
logger.info("updating [indices.recovery.file_chunk_size] from [{}] to [{}]", RecoverySettings.this.fileChunkSize, fileChunkSize);
RecoverySettings.this.fileChunkSize = fileChunkSize;
}

int translogOps = settings.getAsInt(INDICES_RECOVERY_TRANSLOG_OPS, RecoverySettings.this.translogOps);
if (translogOps != RecoverySettings.this.translogOps) {
logger.info("updating [indices.recovery.translog_ops] from [{}] to [{}]", RecoverySettings.this.translogOps, translogOps);
RecoverySettings.this.translogOps = translogOps;
}

ByteSizeValue translogSize = settings.getAsBytesSize(INDICES_RECOVERY_TRANSLOG_SIZE, RecoverySettings.this.translogSize);
if (!translogSize.equals(RecoverySettings.this.translogSize)) {
logger.info("updating [indices.recovery.translog_size] from [{}] to [{}]", RecoverySettings.this.translogSize, translogSize);
RecoverySettings.this.translogSize = translogSize;
}

boolean compress = settings.getAsBoolean(INDICES_RECOVERY_COMPRESS, RecoverySettings.this.compress);
if (compress != RecoverySettings.this.compress) {
logger.info("updating [indices.recovery.compress] from [{}] to [{}]", RecoverySettings.this.compress, compress);
RecoverySettings.this.compress = compress;
}

int concurrentStreams = settings.getAsInt(INDICES_RECOVERY_CONCURRENT_STREAMS, RecoverySettings.this.concurrentStreams);
if (concurrentStreams != RecoverySettings.this.concurrentStreams) {
logger.info("updating [indices.recovery.concurrent_streams] from [{}] to [{}]", RecoverySettings.this.concurrentStreams, concurrentStreams);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.CancellableThreads.Interruptable;
import org.elasticsearch.index.engine.RecoveryEngineException;
Expand All @@ -49,6 +51,7 @@
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
Expand All @@ -57,6 +60,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.StreamSupport;
Expand All @@ -77,9 +81,9 @@ public class RecoverySourceHandler {
private final StartRecoveryRequest request;
private final RecoverySettings recoverySettings;
private final TransportService transportService;
private final int chunkSizeInBytes;

protected final RecoveryResponse response;
private final TransportRequestOptions requestOptions;

private final CancellableThreads cancellableThreads = new CancellableThreads() {
@Override
Expand All @@ -106,14 +110,8 @@ public RecoverySourceHandler(final IndexShard shard, final StartRecoveryRequest
this.transportService = transportService;
this.indexName = this.request.shardId().index().name();
this.shardId = this.request.shardId().id();

this.chunkSizeInBytes = recoverySettings.getChunkSize().bytesAsInt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just access the recoverySettings directly - test changes will have effect faster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think tests should have this level of granularity

this.response = new RecoveryResponse();
this.requestOptions = TransportRequestOptions.builder()
.withCompress(recoverySettings.compress())
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())
.build();

}

/**
Expand Down Expand Up @@ -218,7 +216,7 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView)
totalSize += md.length();
}
List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
phase1Files.addAll(diff.different);
phase1Files.addAll(diff.different);
phase1Files.addAll(diff.missing);
for (StoreFileMetaData md : phase1Files) {
if (request.metadataSnapshot().asMap().containsKey(md.name())) {
Expand Down Expand Up @@ -249,7 +247,7 @@ public void phase1(final IndexCommit snapshot, final Translog.View translogView)
});
// How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong();
final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new RecoveryOutputStream(md, bytesSinceLastPause, translogView);
final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new BufferedOutputStream(new RecoveryOutputStream(md, bytesSinceLastPause, translogView), chunkSizeInBytes);
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
cancellableThreads.execute(() -> {
// Send the CLEAN_FILES request, which takes all of the files that
Expand Down Expand Up @@ -432,7 +430,7 @@ protected int sendSnapshot(final Translog.Snapshot snapshot) {
}

final TransportRequestOptions recoveryOptions = TransportRequestOptions.builder()
.withCompress(recoverySettings.compress())
.withCompress(true)
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionLongTimeout())
.build();
Expand All @@ -451,9 +449,9 @@ protected int sendSnapshot(final Translog.Snapshot snapshot) {
size += operation.estimateSize();
totalOperations++;

// Check if this request is past the size or bytes threshold, and
// Check if this request is past bytes threshold, and
// if so, send it off
if (ops >= recoverySettings.translogOps() || size >= recoverySettings.translogSize().bytes()) {
if (size >= chunkSizeInBytes) {

// don't throttle translog, since we lock for phase3 indexing,
// so we need to move it as fast as possible. Note, since we
Expand Down Expand Up @@ -537,7 +535,7 @@ final class RecoveryOutputStream extends OutputStream {

@Override
public final void write(int b) throws IOException {
write(new byte[]{(byte) b}, 0, 1);
throw new UnsupportedOperationException("we can't send single bytes over the wire");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you :)

}

@Override
Expand All @@ -548,6 +546,11 @@ public final void write(byte[] b, int offset, int length) throws IOException {
}

private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException {
final TransportRequestOptions chunkSendOptions = TransportRequestOptions.builder()
.withCompress(false) // lucene files are already compressed and therefore compressing this won't really help much so we are safing the cpu for other things
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())
.build();
cancellableThreads.execute(() -> {
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
Expand Down Expand Up @@ -577,7 +580,7 @@ private void sendNextChunk(long position, BytesArray content, boolean lastChunk)
* see how many translog ops we accumulate while copying files across the network. A future optimization
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
*/
throttleTimeInNanos), requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
throttleTimeInNanos), chunkSendOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
});
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(request.shardId());
Expand Down Expand Up @@ -670,9 +673,10 @@ Future<Void>[] asyncSendFiles(Store store, StoreFileMetaData[] files, Function<S
pool = recoverySettings.concurrentSmallFileStreamPool();
}
Future<Void> future = pool.submit(() -> {
try (final OutputStream outputStream = outputStreamFactory.apply(md);
final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStream);
try (final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
// it's fine that we are only having the indexInput int he try/with block. The copy methods handles
// exceptions during close correctly and doesn't hide the original exception.
Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStreamFactory.apply(md));
}
return null;
});
Expand Down
Loading