Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.elasticsearch.health.HealthIndicatorService;
import org.elasticsearch.index.IndexSettingProvider;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.ExtensiblePlugin;
import org.elasticsearch.plugins.HealthPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestHandler;
Expand All @@ -100,7 +101,7 @@

import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN;

public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin {
public class DataStreamsPlugin extends Plugin implements ActionPlugin, ExtensiblePlugin, HealthPlugin {

public static final Setting<TimeValue> TIME_SERIES_POLL_INTERVAL = Setting.timeSetting(
"time_series.poll_interval",
Expand Down
27 changes: 27 additions & 0 deletions x-pack/plugin/dlm-frozen-transition/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

apply plugin: 'elasticsearch.internal-es-plugin'
apply plugin: 'elasticsearch.internal-cluster-test'
esplugin {
name = 'dlm-frozen-transition'
description = 'A plugin for the frozen tier functionality of DLM'
classname = 'org.elasticsearch.xpack.dlm.frozen.DlmFrozenTransitionPlugin'
extendedPlugins = ['data-streams', 'x-pack-core']
}
base {
archivesName = 'x-pack-dlm-frozen-transition'
}

dependencies {
compileOnly project(path: xpackModule('core'))
compileOnly project(':modules:data-streams')
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(':modules:data-streams')
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

package org.elasticsearch.xpack.core.datastreams;
package org.elasticsearch.xpack.dlm.frozen;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -53,7 +53,7 @@
/**
* This class encapsulates the steps necessary to convert a data stream backing index to frozen.
*/
public class DataStreamLifecycleConvertToFrozen implements Runnable {
public class DataStreamLifecycleConvertToFrozen implements DlmFrozenTransitionRunnable {

private static final Logger logger = LogManager.getLogger(DataStreamLifecycleConvertToFrozen.class);
public static final String CLONE_INDEX_PREFIX = "dlm-clone-";
Expand Down Expand Up @@ -417,4 +417,9 @@ private void validateAddIndexBlockResponse(AddIndexBlockRequest addIndexBlockReq
}
}
}

@Override
public String getIndexName() {
return indexName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.dlm.frozen;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Strings;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.logging.LogManager.getLogger;

/**
* DlmFrozenTransitionExecutor is responsible for managing and executing tasks related to
* frozen transitions in the distributed lifecycle management (DLM) feature.
* <br>
* This executor limits the number of concurrent transition tasks based on a configurable capacity
* and prevents transitions being executed concurrently for the same index.
* It also ensures that tasks are tracked and cleaned up upon completion or failure.
*/
class DlmFrozenTransitionExecutor implements Closeable {

private static final Logger logger = getLogger(DlmFrozenTransitionExecutor.class);

private static final String EXECUTOR_NAME = "dlm-frozen-transition";

private final Set<String> runningTransitions;
private final int maxConcurrency;
private final ExecutorService executor;

DlmFrozenTransitionExecutor(int maxConcurrency, Settings settings) {
this.runningTransitions = ConcurrentHashMap.newKeySet(maxConcurrency);
this.maxConcurrency = maxConcurrency;
this.executor = EsExecutors.newFixed(
EXECUTOR_NAME,
maxConcurrency,
-1,
EsExecutors.daemonThreadFactory(settings, EXECUTOR_NAME),
new ThreadContext(settings),
EsExecutors.TaskTrackingConfig.DEFAULT
);
}

public boolean isTransitionRunning(String indexName) {
return runningTransitions.contains(indexName);
}

public boolean hasCapacity() {
return runningTransitions.size() < maxConcurrency;
}

public List<Runnable> shutdownNow() {
return executor.shutdownNow();
}

public Future<?> submit(DlmFrozenTransitionRunnable task) {
final String indexName = task.getIndexName();
runningTransitions.add(indexName);
try {
return executor.submit(wrapRunnable(task));
} catch (Exception e) {
runningTransitions.remove(indexName);
throw e;
}
}

/**
* Wraps the task with index tracking and error handling. Ensures the index name is always removed from
* {@link #runningTransitions} when the thread completes, whether successfully or with an error.
*/
private Runnable wrapRunnable(DlmFrozenTransitionRunnable task) {
return () -> {
final String indexName = task.getIndexName();
try {
logger.debug("Starting transition for index [{}]", indexName);
task.run();
logger.debug("Transition completed for index [{}]", indexName);
} catch (Exception ex) {
logger.error(() -> Strings.format("Error executing transition for index [%s]", indexName), ex);
} finally {
runningTransitions.remove(indexName);
}
};
}

@Override
public void close() {
ThreadPool.terminate(executor, 10, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.dlm.frozen;

import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.core.XPackPlugin;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Plugin that registers the {@link DlmFrozenTransitionService} for converting data stream backing indices to the frozen tier as part of
* the data stream lifecycle. Only active when the searchable snapshots feature flag is enabled.
*/
public class DlmFrozenTransitionPlugin extends Plugin {

@Override
public Collection<?> createComponents(PluginServices services) {
Set<Object> components = new HashSet<>(super.createComponents(services));
if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) {
XPackLicenseState licenseState = XPackPlugin.getSharedLicenseState();
var service = new DlmFrozenTransitionService(services.clusterService(), services.client(), licenseState);
service.init();
components.add(service);
}
return components;
}

@Override
public List<Setting<?>> getSettings() {
if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) {
return List.of(DlmFrozenTransitionService.POLL_INTERVAL_SETTING, DlmFrozenTransitionService.MAX_CONCURRENCY_SETTING);
} else {
return List.of();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.dlm.frozen;

/**
* A runnable task associated with a specific index transition.
*/
interface DlmFrozenTransitionRunnable extends Runnable {
String getIndexName();
}
Loading
Loading