Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

import java.io.Closeable;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.logging.LogManager.getLogger;
Expand All @@ -35,32 +36,34 @@
class DlmFrozenTransitionExecutor implements Closeable {

private static final Logger logger = getLogger(DlmFrozenTransitionExecutor.class);

private static final String EXECUTOR_NAME = "dlm-frozen-transition";

private final Set<String> runningTransitions;
private final int maxConcurrency;
private final Map<String, Boolean> submittedTransitions;
private final ExecutorService executor;
private final int maxConcurrency;
private final int maxQueueSize;

DlmFrozenTransitionExecutor(int maxConcurrency, Settings settings) {
this.runningTransitions = ConcurrentHashMap.newKeySet(maxConcurrency);
DlmFrozenTransitionExecutor(int maxConcurrency, int maxQueueSize, Settings settings) {
this.maxConcurrency = maxConcurrency;
this.executor = EsExecutors.newFixed(
EXECUTOR_NAME,
maxConcurrency,
-1,
EsExecutors.daemonThreadFactory(settings, EXECUTOR_NAME),
new ThreadContext(settings),
EsExecutors.TaskTrackingConfig.DEFAULT
);
this.maxQueueSize = maxQueueSize;
this.submittedTransitions = new ConcurrentHashMap<>(maxQueueSize);
ThreadFactory esThreadFactory = EsExecutors.daemonThreadFactory(settings, EXECUTOR_NAME);
this.executor = EsExecutors.newFixed(EXECUTOR_NAME, maxConcurrency, maxQueueSize, r -> {
Thread thread = esThreadFactory.newThread(r);
if (r instanceof WrappedDlmFrozenTransitionRunnable runnable) {
String name = thread.getName();
thread.setName(name + "[" + runnable.getIndexName() + "]");
}
return thread;
}, new ThreadContext(settings), EsExecutors.TaskTrackingConfig.DEFAULT);
}

public boolean isTransitionRunning(String indexName) {
return runningTransitions.contains(indexName);
public boolean transitionSubmitted(String indexName) {
return submittedTransitions.containsKey(indexName);
}

public boolean hasCapacity() {
return runningTransitions.size() < maxConcurrency;
return submittedTransitions.size() < (maxConcurrency + maxQueueSize);
}

public List<Runnable> shutdownNow() {
Expand All @@ -69,36 +72,54 @@ public List<Runnable> shutdownNow() {

public Future<?> submit(DlmFrozenTransitionRunnable task) {
final String indexName = task.getIndexName();
runningTransitions.add(indexName);
submittedTransitions.put(indexName, false);
try {
return executor.submit(wrapRunnable(task));
} catch (Exception e) {
runningTransitions.remove(indexName);
submittedTransitions.remove(indexName);
throw e;
}
}

/**
* Wraps the task with index tracking and error handling. Ensures the index name is always removed from
* {@link #runningTransitions} when the thread completes, whether successfully or with an error.
* {@link #submittedTransitions} when the thread completes, whether successfully or with an error.
*/
private Runnable wrapRunnable(DlmFrozenTransitionRunnable task) {
return () -> {
return new WrappedDlmFrozenTransitionRunnable(task);
}

@Override
public void close() {
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}

private class WrappedDlmFrozenTransitionRunnable implements Runnable {
private final DlmFrozenTransitionRunnable task;

private WrappedDlmFrozenTransitionRunnable(DlmFrozenTransitionRunnable task) {
this.task = task;
}

@Override
public void run() {
final String indexName = task.getIndexName();
try {
logger.debug("Starting transition for index [{}]", indexName);
Boolean previousValue = submittedTransitions.put(indexName, true);
assert Boolean.FALSE.equals(previousValue)
: "expected the previous value to exist and be false, but it was " + previousValue;
task.run();
logger.debug("Transition completed for index [{}]", indexName);
} catch (Exception ex) {
logger.error(() -> Strings.format("Error executing transition for index [%s]", indexName), ex);
} finally {
runningTransitions.remove(indexName);
submittedTransitions.remove(indexName);
}
};
}
}

@Override
public void close() {
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
private String getIndexName() {
return task.getIndexName();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ public Collection<?> createComponents(PluginServices services) {
@Override
public List<Setting<?>> getSettings() {
if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) {
return List.of(DlmFrozenTransitionService.POLL_INTERVAL_SETTING, DlmFrozenTransitionService.MAX_CONCURRENCY_SETTING);
return List.of(
DlmFrozenTransitionService.POLL_INTERVAL_SETTING,
DlmFrozenTransitionService.MAX_CONCURRENCY_SETTING,
DlmFrozenTransitionService.MAX_QUEUE_SIZE
);
} else {
return List.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -58,13 +59,22 @@ class DlmFrozenTransitionService implements ClusterStateListener, Closeable {
Setting.Property.NodeScope
);

static final Setting<Integer> MAX_QUEUE_SIZE = Setting.intSetting(
"dlm.frozen_transition.max_queue_size",
500,
1,
10000,
Setting.Property.NodeScope
);

private final ClusterService clusterService;
private final AtomicBoolean isMaster = new AtomicBoolean(false);
private ScheduledExecutorService schedulerThreadExecutor;
private DlmFrozenTransitionExecutor transitionExecutor;
private final AtomicBoolean closing = new AtomicBoolean(false);
private final TimeValue pollInterval;
private final int maxConcurrency;
private final int maxQueueSize;
private final long initialDelayMillis;

private final BiFunction<String, ProjectId, DlmFrozenTransitionRunnable> transitionRunnableFactory;
Expand Down Expand Up @@ -93,6 +103,7 @@ private DlmFrozenTransitionService(
this.clusterService = clusterService;
this.pollInterval = POLL_INTERVAL_SETTING.get(clusterService.getSettings());
this.maxConcurrency = MAX_CONCURRENCY_SETTING.get(clusterService.getSettings());
this.maxQueueSize = MAX_QUEUE_SIZE.get(clusterService.getSettings());
this.transitionRunnableFactory = transitionRunnableFactory;
this.initialDelayMillis = initialDelayMillis;
}
Expand Down Expand Up @@ -124,12 +135,10 @@ public void clusterChanged(ClusterChangedEvent event) {
private void startThreadPools() {
synchronized (this) {
if (closing.get() == false) {
transitionExecutor = new DlmFrozenTransitionExecutor(maxConcurrency, clusterService.getSettings());
schedulerThreadExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
var thread = new Thread(r, "dlm-frozen-transition-scheduler");
thread.setDaemon(true);
return thread;
});
transitionExecutor = new DlmFrozenTransitionExecutor(maxConcurrency, maxQueueSize, clusterService.getSettings());
schedulerThreadExecutor = Executors.newSingleThreadScheduledExecutor(
EsExecutors.daemonThreadFactory(clusterService.getSettings(), "dlm-frozen-transition-scheduler")
);
schedulerThreadExecutor.scheduleAtFixedRate(
this::checkForFrozenIndices,
initialDelayMillis,
Expand Down Expand Up @@ -204,7 +213,7 @@ void checkForFrozenIndices() {
}
if (indexMarkedForFrozen(projectMetadata.index(index))) {
logger.debug("Frozen index to process detected: {}", index);
if (executor.isTransitionRunning(index.getName())) {
if (executor.transitionSubmitted(index.getName())) {
logger.debug("Transition already running for index [{}], skipping", index);
continue;
} else if (executor.hasCapacity() == false) {
Expand Down

This file was deleted.

Loading
Loading