Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,6 @@ public Step getStep(final IndexMetadata indexMetadata, final Step.StepKey stepKe
final Step s = phaseSteps.stream().filter(step -> step.getKey().equals(stepKey)).findFirst().orElse(null);
if (s != null) {
cachedSteps.put(indexMetadata.getIndex(), Tuple.tuple(indexMetadata, s));
// assert that the cache works as expected -- that is, if we put something into the cache,
// we should get back the same thing if we were to invoke getStep again with the same arguments
assert s == getCachedStep(indexMetadata, stepKey) : "policy step registry cache failed sanity check";
}
return s;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.cluster.metadata.LifecycleExecutionState.ILM_CUSTOM_METADATA_KEY;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -465,4 +467,71 @@ public void testUpdatePolicyButNoPhaseChangeIndexStepsDontChange() throws Except
assertThat(((ShrinkStep) shrinkStep).getNumberOfShards(), equalTo(2));
assertThat(((ShrinkStep) gotStep).getNumberOfShards(), equalTo(1));
}

public void testGetStepMultithreaded() throws Exception {
Client client = mock(Client.class);
Mockito.when(client.settings()).thenReturn(Settings.EMPTY);

LifecyclePolicy policy = LifecyclePolicyTests.randomTimeseriesLifecyclePolicyWithAllPhases("policy");
String phaseName = randomFrom(policy.getPhases().keySet());
Phase phase = policy.getPhases().get(phaseName);

LifecycleExecutionState lifecycleState = LifecycleExecutionState.builder()
.setPhaseDefinition(Strings.toString(new PhaseExecutionInfo(policy.getName(), phase, 1, randomNonNegativeLong())))
.build();
IndexMetadata indexMetadata = IndexMetadata.builder("test")
.settings(
Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put(LifecycleSettings.LIFECYCLE_NAME, "policy")
.build()
)
.putCustom(ILM_CUSTOM_METADATA_KEY, lifecycleState.asMap())
.build();

SortedMap<String, LifecyclePolicyMetadata> metas = new TreeMap<>();
metas.put("policy", new LifecyclePolicyMetadata(policy, Collections.emptyMap(), 1, randomNonNegativeLong()));
IndexLifecycleMetadata meta = new IndexLifecycleMetadata(metas, OperationMode.RUNNING);

PolicyStepsRegistry registry = new PolicyStepsRegistry(REGISTRY, client, null);
registry.update(meta);

// test a variety of getStep calls with random actions and steps
for (int i = 0; i < scaledRandomIntBetween(100, 1000); i++) {
LifecycleAction action = randomValueOtherThan(MigrateAction.DISABLED, () -> randomFrom(phase.getActions().values()));
Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY, null));
Step actualStep = registry.getStep(indexMetadata, step.getKey());
assertThat(actualStep.getKey(), equalTo(step.getKey()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused as to what this tests. There'll be a new step added, by-passing the cache, on every iteration - I'm quite confused specifically as we don't have any steps defined/registered before, yet we do setup some metadatas (both IndexMetadata and IndexLifecycleMetadata). Apologies if I'm missing something very obvious but could we document the intent here? (I'm guessing we want to populate the cache? )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough -- I spent a bit of time stressing over this exact point. It's a little bit tricky, I don't think you're missing something obvious. I'll add some comments and let's see where that gets us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean done = new AtomicBoolean(false);

// now, in another thread, update the registry repeatedly as fast as possible.
// updating the registry has the side effect of clearing the cache.
new Thread(() -> {
latch.countDown(); // signal that we're starting
while (done.get() == false) {
registry.update(meta);
}
}).start();

try {
latch.await(); // wait until the other thread started

// and, while the cache is being repeatedly cleared,
// test a variety of getStep calls with random actions and steps
for (int i = 0; i < scaledRandomIntBetween(100, 1000); i++) {
LifecycleAction action = randomValueOtherThan(MigrateAction.DISABLED, () -> randomFrom(phase.getActions().values()));
Step step = randomFrom(action.toSteps(client, phaseName, MOCK_STEP_KEY, null));
Step actualStep = registry.getStep(indexMetadata, step.getKey());
assertThat(actualStep.getKey(), equalTo(step.getKey()));
}
} finally {
// tell the other thread we're finished
done.set(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super minor nit, but in the event this is the very last test run in a suite (or it's just run by itself), it's possible our thread leak detector will complain since we aren't doing a thread.join() on the thread we spawn before to wait for it to be completely done. Do you think we should add that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, I'll get that in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
}