diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index eb4ba12c094d4..914cac69d02f0 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -44,7 +44,6 @@ import org.elasticsearch.datastreams.action.TransportPromoteDataStreamAction; import org.elasticsearch.datastreams.action.TransportUpdateDataStreamMappingsAction; import org.elasticsearch.datastreams.action.TransportUpdateDataStreamSettingsAction; -import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamLifecycleAction; import org.elasticsearch.datastreams.lifecycle.action.GetDataStreamLifecycleStatsAction; @@ -144,7 +143,6 @@ public static TimeValue getLookAheadTime(Settings settings) { Setting.Property.Dynamic, Setting.Property.ServerlessPublic ); - private final SetOnce errorStoreInitialisationService = new SetOnce<>(); private final SetOnce dataLifecycleInitialisationService = new SetOnce<>(); private final SetOnce dataStreamLifecycleErrorsPublisher = new SetOnce<>(); @@ -202,14 +200,8 @@ public Collection createComponents(PluginServices services) { additionalLookAheadTimeValidation(value, timeSeriesPollInterval); }); components.add(updateTimeSeriesRangeService); - errorStoreInitialisationService.set(new DataStreamLifecycleErrorStore(services.threadPool()::absoluteTimeInMillis)); dataStreamLifecycleErrorsPublisher.set( - new DataStreamLifecycleHealthInfoPublisher( - settings, - services.client(), - services.clusterService(), - errorStoreInitialisationService.get() - ) + new DataStreamLifecycleHealthInfoPublisher(settings, services.client(), services.clusterService(), services.dlmErrorStore()) ); dataLifecycleInitialisationService.set( @@ -220,7 +212,7 @@ public Collection createComponents(PluginServices services) { getClock(), services.threadPool(), services.threadPool()::absoluteTimeInMillis, - errorStoreInitialisationService.get(), + services.dlmErrorStore(), services.allocationService(), dataStreamLifecycleErrorsPublisher.get(), services.dataStreamGlobalRetentionSettings() @@ -229,7 +221,6 @@ public Collection createComponents(PluginServices services) { dataLifecycleInitialisationService.get().init(); dataStreamLifecycleHealthIndicatorService.set(new DataStreamLifecycleHealthIndicatorService(services.projectResolver())); - components.add(errorStoreInitialisationService.get()); components.add(dataLifecycleInitialisationService.get()); components.add(dataStreamLifecycleErrorsPublisher.get()); return components; diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index e6d86705ed40f..77dd16957e585 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -72,6 +72,7 @@ import org.elasticsearch.datastreams.lifecycle.downsampling.DeleteSourceAndAddDownsampleToDS; import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher; import org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/ErrorRecordingActionListener.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/ErrorRecordingActionListener.java index bf1ac18c7b98a..40f4ed2938cb5 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/ErrorRecordingActionListener.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/ErrorRecordingActionListener.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import static org.apache.logging.log4j.LogManager.getLogger; diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java index 65ea41a0eac7a..e48a40023a164 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java @@ -29,7 +29,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisher.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisher.java index b33eeed2d47aa..9f30d6fdc9f8e 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisher.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisher.java @@ -18,7 +18,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo; import org.elasticsearch.health.node.DslErrorInfo; import org.elasticsearch.health.node.UpdateHealthInfoCacheAction; diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStoreTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStoreTests.java index 00a75d4dffba5..9b2e088bb2da5 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStoreTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStoreTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.health.node.DslErrorInfo; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -21,7 +22,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; -import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore.MAX_ERROR_MESSAGE_LENGTH; +import static org.elasticsearch.dlm.DataStreamLifecycleErrorStore.MAX_ERROR_MESSAGE_LENGTH; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasItem; diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index 957e51719766a..507e91078fcb2 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -69,6 +69,7 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsActionTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsActionTests.java index 6f110078fb122..7ee68e561a27e 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsActionTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsActionTests.java @@ -20,8 +20,8 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; -import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.test.ESTestCase; diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisherTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisherTests.java index 5e2df752df6cd..23e00fad13d90 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisherTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisherTests.java @@ -24,8 +24,8 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo; import org.elasticsearch.health.node.DslErrorInfo; import org.elasticsearch.health.node.UpdateHealthInfoCacheAction; diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 1ee9a74f7557e..be5a02a73afcc 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -530,4 +530,5 @@ exports org.elasticsearch.search.diversification; exports org.elasticsearch.search.diversification.mmr; exports org.elasticsearch.inference.completion; + exports org.elasticsearch.dlm; } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java b/server/src/main/java/org/elasticsearch/dlm/DataStreamLifecycleErrorStore.java similarity index 99% rename from modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java rename to server/src/main/java/org/elasticsearch/dlm/DataStreamLifecycleErrorStore.java index 59b005f3db591..0b628c3164146 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java +++ b/server/src/main/java/org/elasticsearch/dlm/DataStreamLifecycleErrorStore.java @@ -7,7 +7,7 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -package org.elasticsearch.datastreams.lifecycle; +package org.elasticsearch.dlm; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.Message; diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 611861ff0289f..27fe5c0f6b88e 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -93,6 +93,7 @@ import org.elasticsearch.core.Tuple; import org.elasticsearch.core.UpdateForV10; import org.elasticsearch.discovery.DiscoveryModule; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.features.FeatureService; @@ -1022,6 +1023,8 @@ public void sendRequest( final CrossProjectModeDecider crossProjectModeDecider = new CrossProjectModeDecider(settings); + final DataStreamLifecycleErrorStore dlmErrorStore = new DataStreamLifecycleErrorStore(threadPool::absoluteTimeInMillis); + PluginServiceInstances pluginServices = new PluginServiceInstances( client, clusterService, @@ -1050,7 +1053,8 @@ public void sendRequest( linkedProjectConfigService, projectRoutingResolver, remoteTransportClient, - crossProjectModeDecider + crossProjectModeDecider, + dlmErrorStore ); Collection pluginComponents = pluginsService.flatMap(plugin -> { @@ -1411,6 +1415,7 @@ public void sendRequest( b.bind(ProjectRoutingResolver.class).toInstance(projectRoutingResolver); b.bind(ActionLoggingFieldsProvider.class).toInstance(loggingFieldsProvider); b.bind(ActivityLogWriterProvider.class).toInstance(logWriterProvider); + b.bind(DataStreamLifecycleErrorStore.class).toInstance(dlmErrorStore); }); if (ReadinessService.enabled(environment)) { diff --git a/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java b/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java index 7b8657dc19deb..413b88aeb6db9 100644 --- a/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java +++ b/server/src/main/java/org/elasticsearch/node/PluginServiceInstances.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.features.FeatureService; @@ -67,5 +68,6 @@ public record PluginServiceInstances( LinkedProjectConfigService linkedProjectConfigService, ProjectRoutingResolver projectRoutingResolver, RemoteTransportClient remoteTransportClient, - CrossProjectModeDecider crossProjectModeDecider + CrossProjectModeDecider crossProjectModeDecider, + DataStreamLifecycleErrorStore dlmErrorStore ) implements Plugin.PluginServices {} diff --git a/server/src/main/java/org/elasticsearch/plugins/Plugin.java b/server/src/main/java/org/elasticsearch/plugins/Plugin.java index 877832f1f921d..8d20d670047da 100644 --- a/server/src/main/java/org/elasticsearch/plugins/Plugin.java +++ b/server/src/main/java/org/elasticsearch/plugins/Plugin.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.features.FeatureService; @@ -216,6 +217,9 @@ public interface PluginServices { /** A service to determine whether Cross-Project Search applies to a request */ CrossProjectModeDecider crossProjectModeDecider(); + + /** A utility for recording lifecycle errors for data stream lifecycles */ + DataStreamLifecycleErrorStore dlmErrorStore(); } /** diff --git a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DataStreamLifecycleConvertToFrozen.java b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DataStreamLifecycleConvertToFrozen.java index 01480bde15abd..b4a4a5aa7cf33 100644 --- a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DataStreamLifecycleConvertToFrozen.java +++ b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DataStreamLifecycleConvertToFrozen.java @@ -529,6 +529,11 @@ public String getIndexName() { return indexName; } + @Override + public ProjectId getProjectId() { + return projectId; + } + private ProjectState getProjectState() { return clusterService.state().projectState(projectId); } diff --git a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutor.java b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutor.java index 1c07ebec57a29..545f51efd8789 100644 --- a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutor.java +++ b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutor.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.Strings; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.logging.Logger; import org.elasticsearch.threadpool.ThreadPool; @@ -42,8 +43,9 @@ class DlmFrozenTransitionExecutor implements Closeable { private final ExecutorService executor; private final int maxConcurrency; private final int maxQueueSize; + private final DataStreamLifecycleErrorStore errorStore; - DlmFrozenTransitionExecutor(int maxConcurrency, int maxQueueSize, Settings settings) { + DlmFrozenTransitionExecutor(int maxConcurrency, int maxQueueSize, Settings settings, DataStreamLifecycleErrorStore errorStore) { this.maxConcurrency = maxConcurrency; this.maxQueueSize = maxQueueSize; this.submittedTransitions = new ConcurrentHashMap<>(maxQueueSize); @@ -56,6 +58,7 @@ class DlmFrozenTransitionExecutor implements Closeable { } return thread; }, new ThreadContext(settings), EsExecutors.TaskTrackingConfig.DEFAULT); + this.errorStore = errorStore; } public boolean transitionSubmitted(String indexName) { @@ -112,7 +115,13 @@ public void run() { task.run(); logger.debug("Transition completed for index [{}]", indexName); } catch (Exception ex) { - logger.error(() -> Strings.format("Error executing transition for index [%s]", indexName), ex); + errorStore.recordAndLogError( + task.getProjectId(), + indexName, + ex, + Strings.format("Error executing transition for index [%s]", indexName), + 1 + ); } finally { submittedTransitions.remove(indexName); } diff --git a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionPlugin.java b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionPlugin.java index 1d15d61a4c589..86617348c475d 100644 --- a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionPlugin.java +++ b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionPlugin.java @@ -28,7 +28,12 @@ public Collection createComponents(PluginServices services) { Set components = new HashSet<>(super.createComponents(services)); if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) { XPackLicenseState licenseState = XPackPlugin.getSharedLicenseState(); - var service = new DlmFrozenTransitionService(services.clusterService(), services.client(), licenseState); + var service = new DlmFrozenTransitionService( + services.clusterService(), + services.client(), + licenseState, + services.dlmErrorStore() + ); service.init(); components.add(service); } diff --git a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionRunnable.java b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionRunnable.java index eeb8b583cbf00..a9ee49b670a46 100644 --- a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionRunnable.java +++ b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionRunnable.java @@ -7,9 +7,13 @@ package org.elasticsearch.xpack.dlm.frozen; +import org.elasticsearch.cluster.metadata.ProjectId; + /** * A runnable task associated with a specific index transition. */ interface DlmFrozenTransitionRunnable extends Runnable { String getIndexName(); + + ProjectId getProjectId(); } diff --git a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionService.java b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionService.java index fb609f40efb5a..689cfbecc0ec1 100644 --- a/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionService.java +++ b/x-pack/plugin/dlm-frozen-transition/src/main/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionService.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.Index; import org.elasticsearch.license.XPackLicenseState; @@ -76,14 +77,21 @@ class DlmFrozenTransitionService implements ClusterStateListener, Closeable { private final int maxConcurrency; private final int maxQueueSize; private final long initialDelayMillis; + private final DataStreamLifecycleErrorStore errorStore; private final BiFunction transitionRunnableFactory; - DlmFrozenTransitionService(ClusterService clusterService, Client client, XPackLicenseState licenseState) { + DlmFrozenTransitionService( + ClusterService clusterService, + Client client, + XPackLicenseState licenseState, + DataStreamLifecycleErrorStore errorStore + ) { this( clusterService, (index, pid) -> new DataStreamLifecycleConvertToFrozen(index, pid, client, clusterService, licenseState), - POLL_INTERVAL_SETTING.get(clusterService.getSettings()).millis() + POLL_INTERVAL_SETTING.get(clusterService.getSettings()).millis(), + errorStore ); } @@ -92,13 +100,14 @@ class DlmFrozenTransitionService implements ClusterStateListener, Closeable { ClusterService clusterService, BiFunction transitionRunnableFactory ) { - this(clusterService, transitionRunnableFactory, 0); + this(clusterService, transitionRunnableFactory, 0, new DataStreamLifecycleErrorStore(System::currentTimeMillis)); } private DlmFrozenTransitionService( ClusterService clusterService, BiFunction transitionRunnableFactory, - long initialDelayMillis + long initialDelayMillis, + DataStreamLifecycleErrorStore errorStore ) { this.clusterService = clusterService; this.pollInterval = POLL_INTERVAL_SETTING.get(clusterService.getSettings()); @@ -106,6 +115,7 @@ private DlmFrozenTransitionService( this.maxQueueSize = MAX_QUEUE_SIZE.get(clusterService.getSettings()); this.transitionRunnableFactory = transitionRunnableFactory; this.initialDelayMillis = initialDelayMillis; + this.errorStore = errorStore; } /** @@ -135,7 +145,12 @@ public void clusterChanged(ClusterChangedEvent event) { private void startThreadPools() { synchronized (this) { if (closing.get() == false) { - transitionExecutor = new DlmFrozenTransitionExecutor(maxConcurrency, maxQueueSize, clusterService.getSettings()); + transitionExecutor = new DlmFrozenTransitionExecutor( + maxConcurrency, + maxQueueSize, + clusterService.getSettings(), + errorStore + ); schedulerThreadExecutor = Executors.newSingleThreadScheduledExecutor( EsExecutors.daemonThreadFactory(clusterService.getSettings(), "dlm-frozen-transition-scheduler") ); diff --git a/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutorTests.java b/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutorTests.java index 162b7a7245c7d..eb8cfff5b3f95 100644 --- a/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutorTests.java +++ b/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionExecutorTests.java @@ -7,8 +7,11 @@ package org.elasticsearch.xpack.dlm.frozen; +import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.WrappedRunnable; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; @@ -23,10 +26,12 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.containsString; + public class DlmFrozenTransitionExecutorTests extends ESTestCase { public void testTransitionSubmitted() throws Exception { - try (var executor = new DlmFrozenTransitionExecutor(2, 10, Settings.EMPTY)) { + try (var executor = new DlmFrozenTransitionExecutor(2, 10, Settings.EMPTY, makeErrorStore())) { var task = new TestDlmFrozenTransitionRunnable("running-index"); task.blockUntil = new CountDownLatch(1); @@ -43,7 +48,7 @@ public void testTransitionSubmitted() throws Exception { } public void testTransitionRemovedAfterCompletion() throws Exception { - try (var executor = new DlmFrozenTransitionExecutor(2, 100, Settings.EMPTY)) { + try (var executor = new DlmFrozenTransitionExecutor(2, 100, Settings.EMPTY, makeErrorStore())) { var task = new TestDlmFrozenTransitionRunnable("done-index"); executor.submit(task).get(10, TimeUnit.SECONDS); @@ -53,17 +58,21 @@ public void testTransitionRemovedAfterCompletion() throws Exception { } public void testTransitionRemovedAfterFailure() throws Exception { - try (var executor = new DlmFrozenTransitionExecutor(2, 100, Settings.EMPTY)) { + var errorStore = makeErrorStore(); + try (var executor = new DlmFrozenTransitionExecutor(2, 100, Settings.EMPTY, errorStore)) { var runtimeTask = new TestDlmFrozenTransitionRunnable("exception-index"); runtimeTask.throwOnRun = new IllegalStateException("simulated failure"); executor.submit(runtimeTask).get(10, TimeUnit.SECONDS); assertFalse(executor.transitionSubmitted("exception-index")); + ErrorEntry err = errorStore.getError(ProjectId.DEFAULT, "exception-index"); + assertNotNull("expected an error to be recorded in the error store", err); + assertThat(err.error(), containsString("simulated failure")); } } public void testHasCapacity() throws Exception { int maxQueue = randomIntBetween(2, 50); - try (var executor = new DlmFrozenTransitionExecutor(1, maxQueue, Settings.EMPTY)) { + try (var executor = new DlmFrozenTransitionExecutor(1, maxQueue, Settings.EMPTY, makeErrorStore())) { CountDownLatch tasksStarted = new CountDownLatch(1); CountDownLatch firstTaskBlock = new CountDownLatch(1); CountDownLatch taskBlock = new CountDownLatch(1); @@ -93,7 +102,7 @@ public void testHasCapacity() throws Exception { } public void testShutdownNow() throws Exception { - var executor = new DlmFrozenTransitionExecutor(1, 10, Settings.EMPTY); + var executor = new DlmFrozenTransitionExecutor(1, 10, Settings.EMPTY, makeErrorStore()); var task = new TestDlmFrozenTransitionRunnable("block-index"); task.blockUntil = new CountDownLatch(1); @@ -112,7 +121,7 @@ public void testShutdownNow() throws Exception { * This is the invariant that {@code checkForFrozenIndices} relies on to prevent re-submission of queued tasks. */ public void testTransitionSubmittedReturnsTrueForQueuedTask() throws Exception { - try (var executor = new DlmFrozenTransitionExecutor(1, 2, Settings.EMPTY)) { + try (var executor = new DlmFrozenTransitionExecutor(1, 2, Settings.EMPTY, makeErrorStore())) { CountDownLatch firstStarted = new CountDownLatch(1); CountDownLatch block = new CountDownLatch(1); @@ -138,7 +147,7 @@ public void testTransitionSubmittedReturnsTrueForQueuedTask() throws Exception { * must remove the index from {@code submittedTransitions} before rethrowing, so that a future poll can retry. */ public void testSubmitCleansUpEntryOnRejectedExecution() throws Exception { - var executor = new DlmFrozenTransitionExecutor(1, 1, Settings.EMPTY); + var executor = new DlmFrozenTransitionExecutor(1, 1, Settings.EMPTY, makeErrorStore()); try { CountDownLatch block = new CountDownLatch(1); CountDownLatch firstStarted = new CountDownLatch(1); @@ -171,7 +180,7 @@ public void testSubmitCleansUpEntryOnRejectedExecution() throws Exception { * and had not yet started, not only the currently-executing task. */ public void testShutdownNowReturnsQueuedTasks() throws Exception { - var executor = new DlmFrozenTransitionExecutor(1, 5, Settings.EMPTY); + var executor = new DlmFrozenTransitionExecutor(1, 5, Settings.EMPTY, makeErrorStore()); CountDownLatch block = new CountDownLatch(1); CountDownLatch firstStarted = new CountDownLatch(1); @@ -207,7 +216,7 @@ public void testShutdownNowReturnsQueuedTasks() throws Exception { */ public void testSimultaneousSubmissionsFromMultipleThreads() throws Exception { int maxConcurrency = between(2, 50); - try (var executor = new DlmFrozenTransitionExecutor(maxConcurrency, 1, Settings.EMPTY)) { + try (var executor = new DlmFrozenTransitionExecutor(maxConcurrency, 1, Settings.EMPTY, makeErrorStore())) { CyclicBarrier barrier = new CyclicBarrier(maxConcurrency + 1); List> futures = new CopyOnWriteArrayList<>(); List errors = new CopyOnWriteArrayList<>(); @@ -259,6 +268,11 @@ public String getIndexName() { return indexName; } + @Override + public ProjectId getProjectId() { + return ProjectId.DEFAULT; + } + @Override public void run() { started.countDown(); @@ -275,4 +289,8 @@ public void run() { } } } + + private DataStreamLifecycleErrorStore makeErrorStore() { + return new DataStreamLifecycleErrorStore(System::currentTimeMillis); + } } diff --git a/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionServiceTests.java b/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionServiceTests.java index c5c4090fe6e43..33e15d223c100 100644 --- a/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionServiceTests.java +++ b/x-pack/plugin/dlm-frozen-transition/src/test/java/org/elasticsearch/xpack/dlm/frozen/DlmFrozenTransitionServiceTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -64,6 +65,11 @@ public String getIndexName() { return indexName; } + @Override + public ProjectId getProjectId() { + return ProjectId.DEFAULT; + } + @Override public void run() { started.countDown(); diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java index c37cd36dacb64..8b6c5fba2bfe2 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java @@ -35,8 +35,8 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.DataStreamsPlugin; -import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java index 513f231f2a826..536ec5459b363 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java @@ -29,8 +29,8 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.DataStreamsPlugin; -import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; +import org.elasticsearch.dlm.DataStreamLifecycleErrorStore; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.extras.MapperExtrasPlugin;