Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.datastreams.action.TransportPromoteDataStreamAction;
import org.elasticsearch.datastreams.action.TransportUpdateDataStreamMappingsAction;
import org.elasticsearch.datastreams.action.TransportUpdateDataStreamSettingsAction;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamLifecycleAction;
import org.elasticsearch.datastreams.lifecycle.action.GetDataStreamLifecycleStatsAction;
Expand Down Expand Up @@ -144,7 +143,6 @@ public static TimeValue getLookAheadTime(Settings settings) {
Setting.Property.Dynamic,
Setting.Property.ServerlessPublic
);
private final SetOnce<DataStreamLifecycleErrorStore> errorStoreInitialisationService = new SetOnce<>();

private final SetOnce<DataStreamLifecycleService> dataLifecycleInitialisationService = new SetOnce<>();
private final SetOnce<DataStreamLifecycleHealthInfoPublisher> dataStreamLifecycleErrorsPublisher = new SetOnce<>();
Expand Down Expand Up @@ -202,14 +200,8 @@ public Collection<?> createComponents(PluginServices services) {
additionalLookAheadTimeValidation(value, timeSeriesPollInterval);
});
components.add(updateTimeSeriesRangeService);
errorStoreInitialisationService.set(new DataStreamLifecycleErrorStore(services.threadPool()::absoluteTimeInMillis));
dataStreamLifecycleErrorsPublisher.set(
new DataStreamLifecycleHealthInfoPublisher(
settings,
services.client(),
services.clusterService(),
errorStoreInitialisationService.get()
)
new DataStreamLifecycleHealthInfoPublisher(settings, services.client(), services.clusterService(), services.dlmErrorStore())
);

dataLifecycleInitialisationService.set(
Expand All @@ -220,7 +212,7 @@ public Collection<?> createComponents(PluginServices services) {
getClock(),
services.threadPool(),
services.threadPool()::absoluteTimeInMillis,
errorStoreInitialisationService.get(),
services.dlmErrorStore(),
services.allocationService(),
dataStreamLifecycleErrorsPublisher.get(),
services.dataStreamGlobalRetentionSettings()
Expand All @@ -229,7 +221,6 @@ public Collection<?> createComponents(PluginServices services) {
dataLifecycleInitialisationService.get().init();
dataStreamLifecycleHealthIndicatorService.set(new DataStreamLifecycleHealthIndicatorService(services.projectResolver()));

components.add(errorStoreInitialisationService.get());
components.add(dataLifecycleInitialisationService.get());
components.add(dataStreamLifecycleErrorsPublisher.get());
return components;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.elasticsearch.datastreams.lifecycle.downsampling.DeleteSourceAndAddDownsampleToDS;
import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher;
import org.elasticsearch.datastreams.lifecycle.transitions.steps.MarkIndexForDLMForceMergeAction;
import org.elasticsearch.dlm.DataStreamLifecycleErrorStore;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.dlm.DataStreamLifecycleErrorStore;

import static org.apache.logging.log4j.LogManager.getLogger;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
import org.elasticsearch.dlm.DataStreamLifecycleErrorStore;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
import org.elasticsearch.dlm.DataStreamLifecycleErrorStore;
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
import org.elasticsearch.health.node.DslErrorInfo;
import org.elasticsearch.health.node.UpdateHealthInfoCacheAction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.dlm.DataStreamLifecycleErrorStore;
import org.elasticsearch.health.node.DslErrorInfo;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
Expand All @@ -21,7 +22,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore.MAX_ERROR_MESSAGE_LENGTH;
import static org.elasticsearch.dlm.DataStreamLifecycleErrorStore.MAX_ERROR_MESSAGE_LENGTH;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.datastreams.lifecycle.health.DataStreamLifecycleHealthInfoPublisher;
import org.elasticsearch.dlm.DataStreamLifecycleErrorStore;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
import org.elasticsearch.dlm.DataStreamLifecycleErrorStore;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.test.ESTestCase;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
import org.elasticsearch.dlm.DataStreamLifecycleErrorStore;
import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo;
import org.elasticsearch.health.node.DslErrorInfo;
import org.elasticsearch.health.node.UpdateHealthInfoCacheAction;
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -530,4 +530,5 @@
exports org.elasticsearch.search.diversification;
exports org.elasticsearch.search.diversification.mmr;
exports org.elasticsearch.inference.completion;
exports org.elasticsearch.dlm;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.datastreams.lifecycle;
package org.elasticsearch.dlm;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
Comment on lines 12 to 13
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these use the ES wrapper's instead?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if they did, but apparently the error store uses bits of the logger that our log manager does not support, so for now they have to remain using the log4j one.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.elasticsearch.core.Tuple;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.dlm.DataStreamLifecycleErrorStore;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.features.FeatureService;
Expand Down Expand Up @@ -1022,6 +1023,8 @@ public <T extends TransportResponse> void sendRequest(

final CrossProjectModeDecider crossProjectModeDecider = new CrossProjectModeDecider(settings);

final DataStreamLifecycleErrorStore dlmErrorStore = new DataStreamLifecycleErrorStore(threadPool::absoluteTimeInMillis);

PluginServiceInstances pluginServices = new PluginServiceInstances(
client,
clusterService,
Expand Down Expand Up @@ -1050,7 +1053,8 @@ public <T extends TransportResponse> void sendRequest(
linkedProjectConfigService,
projectRoutingResolver,
remoteTransportClient,
crossProjectModeDecider
crossProjectModeDecider,
dlmErrorStore
);

Collection<?> pluginComponents = pluginsService.flatMap(plugin -> {
Expand Down Expand Up @@ -1411,6 +1415,7 @@ public <T extends TransportResponse> void sendRequest(
b.bind(ProjectRoutingResolver.class).toInstance(projectRoutingResolver);
b.bind(ActionLoggingFieldsProvider.class).toInstance(loggingFieldsProvider);
b.bind(ActivityLogWriterProvider.class).toInstance(logWriterProvider);
b.bind(DataStreamLifecycleErrorStore.class).toInstance(dlmErrorStore);
});

if (ReadinessService.enabled(environment)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.dlm.DataStreamLifecycleErrorStore;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.features.FeatureService;
Expand Down Expand Up @@ -67,5 +68,6 @@ public record PluginServiceInstances(
LinkedProjectConfigService linkedProjectConfigService,
ProjectRoutingResolver projectRoutingResolver,
RemoteTransportClient remoteTransportClient,
CrossProjectModeDecider crossProjectModeDecider
CrossProjectModeDecider crossProjectModeDecider,
DataStreamLifecycleErrorStore dlmErrorStore
) implements Plugin.PluginServices {}
4 changes: 4 additions & 0 deletions server/src/main/java/org/elasticsearch/plugins/Plugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.dlm.DataStreamLifecycleErrorStore;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.features.FeatureService;
Expand Down Expand Up @@ -216,6 +217,9 @@ public interface PluginServices {

/** A service to determine whether Cross-Project Search applies to a request */
CrossProjectModeDecider crossProjectModeDecider();

/** A utility for recording lifecycle errors for data stream lifecycles */
DataStreamLifecycleErrorStore dlmErrorStore();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,11 @@ public String getIndexName() {
return indexName;
}

@Override
public ProjectId getProjectId() {
return projectId;
}

private ProjectState getProjectState() {
return clusterService.state().projectState(projectId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Strings;
import org.elasticsearch.dlm.DataStreamLifecycleErrorStore;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -42,8 +43,9 @@ class DlmFrozenTransitionExecutor implements Closeable {
private final ExecutorService executor;
private final int maxConcurrency;
private final int maxQueueSize;
private final DataStreamLifecycleErrorStore errorStore;

DlmFrozenTransitionExecutor(int maxConcurrency, int maxQueueSize, Settings settings) {
DlmFrozenTransitionExecutor(int maxConcurrency, int maxQueueSize, Settings settings, DataStreamLifecycleErrorStore errorStore) {
this.maxConcurrency = maxConcurrency;
this.maxQueueSize = maxQueueSize;
this.submittedTransitions = new ConcurrentHashMap<>(maxQueueSize);
Expand All @@ -56,6 +58,7 @@ class DlmFrozenTransitionExecutor implements Closeable {
}
return thread;
}, new ThreadContext(settings), EsExecutors.TaskTrackingConfig.DEFAULT);
this.errorStore = errorStore;
}

public boolean transitionSubmitted(String indexName) {
Expand Down Expand Up @@ -112,7 +115,13 @@ public void run() {
task.run();
logger.debug("Transition completed for index [{}]", indexName);
} catch (Exception ex) {
logger.error(() -> Strings.format("Error executing transition for index [%s]", indexName), ex);
errorStore.recordAndLogError(
task.getProjectId(),
indexName,
ex,
Strings.format("Error executing transition for index [%s]", indexName),
1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want all errors logged at WARN or can we go with the same value as DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING and stick to every 10th?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was matching the behavior for now, I think we need to know whether we want this to be the one-stop-shop for error handling (in which case, we'll need to move that setting into core and add code to make it dynamic here) or whether we want to do more error handling in the converter. Let's discuss it.

);
} finally {
submittedTransitions.remove(indexName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ public Collection<?> createComponents(PluginServices services) {
Set<Object> components = new HashSet<>(super.createComponents(services));
if (DataStreamLifecycle.DLM_SEARCHABLE_SNAPSHOTS_FEATURE_FLAG.isEnabled()) {
XPackLicenseState licenseState = XPackPlugin.getSharedLicenseState();
var service = new DlmFrozenTransitionService(services.clusterService(), services.client(), licenseState);
var service = new DlmFrozenTransitionService(
services.clusterService(),
services.client(),
licenseState,
services.dlmErrorStore()
);
service.init();
components.add(service);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@

package org.elasticsearch.xpack.dlm.frozen;

import org.elasticsearch.cluster.metadata.ProjectId;

/**
* A runnable task associated with a specific index transition.
*/
interface DlmFrozenTransitionRunnable extends Runnable {
String getIndexName();

ProjectId getProjectId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.dlm.DataStreamLifecycleErrorStore;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.Index;
import org.elasticsearch.license.XPackLicenseState;
Expand Down Expand Up @@ -76,14 +77,21 @@ class DlmFrozenTransitionService implements ClusterStateListener, Closeable {
private final int maxConcurrency;
private final int maxQueueSize;
private final long initialDelayMillis;
private final DataStreamLifecycleErrorStore errorStore;

private final BiFunction<String, ProjectId, DlmFrozenTransitionRunnable> transitionRunnableFactory;

DlmFrozenTransitionService(ClusterService clusterService, Client client, XPackLicenseState licenseState) {
DlmFrozenTransitionService(
ClusterService clusterService,
Client client,
XPackLicenseState licenseState,
DataStreamLifecycleErrorStore errorStore
) {
this(
clusterService,
(index, pid) -> new DataStreamLifecycleConvertToFrozen(index, pid, client, clusterService, licenseState),
POLL_INTERVAL_SETTING.get(clusterService.getSettings()).millis()
POLL_INTERVAL_SETTING.get(clusterService.getSettings()).millis(),
errorStore
);
}

Expand All @@ -92,20 +100,22 @@ class DlmFrozenTransitionService implements ClusterStateListener, Closeable {
ClusterService clusterService,
BiFunction<String, ProjectId, DlmFrozenTransitionRunnable> transitionRunnableFactory
) {
this(clusterService, transitionRunnableFactory, 0);
this(clusterService, transitionRunnableFactory, 0, new DataStreamLifecycleErrorStore(System::currentTimeMillis));
}

private DlmFrozenTransitionService(
ClusterService clusterService,
BiFunction<String, ProjectId, DlmFrozenTransitionRunnable> transitionRunnableFactory,
long initialDelayMillis
long initialDelayMillis,
DataStreamLifecycleErrorStore errorStore
) {
this.clusterService = clusterService;
this.pollInterval = POLL_INTERVAL_SETTING.get(clusterService.getSettings());
this.maxConcurrency = MAX_CONCURRENCY_SETTING.get(clusterService.getSettings());
this.maxQueueSize = MAX_QUEUE_SIZE.get(clusterService.getSettings());
this.transitionRunnableFactory = transitionRunnableFactory;
this.initialDelayMillis = initialDelayMillis;
this.errorStore = errorStore;
}

/**
Expand Down Expand Up @@ -135,7 +145,12 @@ public void clusterChanged(ClusterChangedEvent event) {
private void startThreadPools() {
synchronized (this) {
if (closing.get() == false) {
transitionExecutor = new DlmFrozenTransitionExecutor(maxConcurrency, maxQueueSize, clusterService.getSettings());
transitionExecutor = new DlmFrozenTransitionExecutor(
maxConcurrency,
maxQueueSize,
clusterService.getSettings(),
errorStore
);
schedulerThreadExecutor = Executors.newSingleThreadScheduledExecutor(
EsExecutors.daemonThreadFactory(clusterService.getSettings(), "dlm-frozen-transition-scheduler")
);
Expand Down
Loading
Loading