Skip to content

Commit

Permalink
Fix topic operator loop for unmanaged topics
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Díaz <[email protected]>
  • Loading branch information
padilo committed Aug 30, 2024
1 parent f3b8493 commit 3bcb22a
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
import io.strimzi.api.kafka.model.common.ConditionBuilder;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.api.kafka.model.topic.KafkaTopicBuilder;
import io.strimzi.api.kafka.model.topic.KafkaTopicStatus;
import io.strimzi.api.kafka.model.topic.KafkaTopicStatusBuilder;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.model.StatusDiff;
import io.strimzi.operator.common.model.StatusUtils;
import io.strimzi.operator.topic.metrics.TopicOperatorMetricsHolder;
import io.strimzi.operator.topic.model.Either;
Expand Down Expand Up @@ -186,7 +186,7 @@ static String resourceVersion(KafkaTopic kt) {
private List<ReconcilableTopic> addOrRemoveFinalizer(boolean useFinalizer, List<ReconcilableTopic> reconcilableTopics) {
List<ReconcilableTopic> collect = reconcilableTopics.stream()
.map(reconcilableTopic ->
new ReconcilableTopic(reconcilableTopic.reconciliation(), useFinalizer
new ReconcilableTopic(reconcilableTopic.reconciliation(), useFinalizer
? addFinalizer(reconcilableTopic) : removeFinalizer(reconcilableTopic), reconcilableTopic.topicName()))
.collect(Collectors.toList());
LOGGER.traceOp("{} {} topics", useFinalizer ? "Added finalizers to" : "Removed finalizers from", reconcilableTopics.size());
Expand Down Expand Up @@ -1132,7 +1132,7 @@ private void updateStatusForSuccess(ReconcilableTopic reconcilableTopic) {
} else if (TopicOperatorUtil.isPaused(reconcilableTopic.kt())) {
conditionType = "ReconciliationPaused";
}

conditions.add(new ConditionBuilder()
.withType(conditionType)
.withStatus("True")
Expand Down Expand Up @@ -1210,10 +1210,10 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) {
var oldStatus = Crds.topicOperation(kubeClient)
.inNamespace(reconcilableTopic.kt().getMetadata().getNamespace())
.withName(reconcilableTopic.kt().getMetadata().getName()).get().getStatus();

// the observedGeneration is a marker that shows that the operator works and that it saw the last update to the resource
reconcilableTopic.kt().getStatus().setObservedGeneration(reconcilableTopic.kt().getMetadata().getGeneration());

// set or reset the topicName
reconcilableTopic.kt().getStatus().setTopicName(
!TopicOperatorUtil.isManaged(reconcilableTopic.kt())
Expand All @@ -1222,8 +1222,9 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) {
? oldStatus.getTopicName()
: TopicOperatorUtil.topicName(reconcilableTopic.kt())
);

if (statusChanged(reconcilableTopic.kt(), oldStatus)) {

StatusDiff statusDiff = new StatusDiff(oldStatus, reconcilableTopic.kt().getStatus());
if (!statusDiff.isEmpty()) {
try {
var updatedTopic = new KafkaTopicBuilder(reconcilableTopic.kt())
.editOrNewMetadata()
Expand All @@ -1242,49 +1243,4 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) {
}
}
}

private boolean statusChanged(KafkaTopic kt, KafkaTopicStatus oldStatus) {
return oldStatusOrTopicNameMissing(oldStatus)
|| nonPausedAndDifferentGenerations(kt, oldStatus)
|| differentConditions(kt.getStatus().getConditions(), oldStatus.getConditions())
|| replicasChangesDiffer(kt, oldStatus);
}

private boolean oldStatusOrTopicNameMissing(KafkaTopicStatus oldStatus) {
return oldStatus == null || oldStatus.getTopicName() == null;
}

private boolean nonPausedAndDifferentGenerations(KafkaTopic kt, KafkaTopicStatus oldStatus) {
return !TopicOperatorUtil.isPaused(kt) && oldStatus.getObservedGeneration() != kt.getMetadata().getGeneration();
}

private boolean differentConditions(List<Condition> newConditions, List<Condition> oldConditions) {
if (Objects.equals(newConditions, oldConditions)) {
return false;
} else if (newConditions == null || oldConditions == null || newConditions.size() != oldConditions.size()) {
return true;
} else {
for (int i = 0; i < newConditions.size(); i++) {
if (conditionsDiffer(newConditions.get(i), oldConditions.get(i))) {
return true;
}
}
}
return false;
}

private boolean conditionsDiffer(Condition newCondition, Condition oldCondition) {
return !Objects.equals(newCondition.getType(), oldCondition.getType())
|| !Objects.equals(newCondition.getStatus(), oldCondition.getStatus())
|| !Objects.equals(newCondition.getReason(), oldCondition.getReason())
|| !Objects.equals(newCondition.getMessage(), oldCondition.getMessage());
}

@SuppressWarnings("BooleanExpressionComplexity")
private boolean replicasChangesDiffer(KafkaTopic kt, KafkaTopicStatus oldStatus) {
return kt.getStatus().getReplicasChange() == null && oldStatus.getReplicasChange() != null
|| kt.getStatus().getReplicasChange() != null && oldStatus.getReplicasChange() == null
|| (kt.getStatus().getReplicasChange() != null && oldStatus.getReplicasChange() != null
&& !Objects.equals(kt.getStatus().getReplicasChange(), oldStatus.getReplicasChange()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,23 @@ private static Predicate<KafkaTopic> readyIsFalseAndReasonIs(String requiredReas
}

private static Predicate<KafkaTopic> readyIsTrueOrFalse() {
return typeIsTrueOrFalse("Ready");
}

private static Predicate<KafkaTopic> unmanagedIsTrueOrFalse() {
return typeIsTrueOrFalse("Unmanaged");
}

private static Predicate<KafkaTopic> typeIsTrueOrFalse(String type) {
Predicate<Condition> conditionPredicate = condition ->
"Ready".equals(condition.getType())
&& "True".equals(condition.getStatus())
|| "False".equals(condition.getStatus());
return isReconcilatedAndHasConditionMatching("Ready=True or False", conditionPredicate);
type.equals(condition.getType())
&& "True".equals(condition.getStatus())
|| "False".equals(condition.getStatus());
return isReconcilatedAndHasConditionMatching(type + "=True or False", conditionPredicate);
}

private static Predicate<KafkaTopic> unmanagedStatusTrue() {
return typeIsTrueOrFalse("Unmanaged");
}

private static Predicate<KafkaTopic> unmanagedIsTrue() {
Expand Down Expand Up @@ -665,7 +677,7 @@ public void shouldNotCreateTopicInKafkaWhenUnmanagedTopicCreatedInKube(
// given

// when
var reconciled = createTopic(kafkaCluster, kt);
var reconciled = createTopic(kafkaCluster, kt, unmanagedStatusTrue());

// then
assertNull(reconciled.getStatus().getTopicName());
Expand Down Expand Up @@ -1137,7 +1149,7 @@ public void shouldRestoreFinalizerIfRemoved(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException, TimeoutException {
// given
var created = createTopic(kafkaCluster, kt);
var created = createTopic(kafkaCluster, kt, TopicOperatorUtil.isManaged(kt) ? readyIsTrueOrFalse() : unmanagedIsTrueOrFalse());
if (TopicOperatorUtil.isManaged(kt)) {
assertCreateSuccess(kt, created);
}
Expand Down Expand Up @@ -2153,4 +2165,63 @@ public void shouldReconcileOnTopicExistsException(
KafkaTopic kafkaTopic = createTopic(kafkaCluster, kafkaTopic(NAMESPACE, topicName, true, topicName, 2, 1));
assertTrue(readyIsTrue().test(kafkaTopic));
}

@Test
public void shouldUpdateAnUnmanagedTopic(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
var topicName = "my-topic";

// create the topic
var topic = createTopic(kafkaCluster,
kafkaTopic(NAMESPACE, topicName, SELECTOR, null, null, topicName, 1, 1,
Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000")));
topic = Crds.topicOperation(kubernetesClient).resource(topic).get();

TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt ->
Optional.of(kt)
.map(KafkaTopic::getStatus)
.map(KafkaTopicStatus::getConditions)
.flatMap(c -> Optional.of(c.get(0)))
.map(Condition::getType)
.filter("Ready"::equals)
.isPresent()
);

// set unmanaged
topic = Crds.topicOperation(kubernetesClient).resource(topic).get();
topic.setStatus(null);
topic.getMetadata().getAnnotations().put(TopicOperatorUtil.MANAGED, "false");
topic = Crds.topicOperation(kubernetesClient).resource(topic).update();

TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt ->
Optional.of(kt)
.map(KafkaTopic::getStatus)
.map(KafkaTopicStatus::getConditions)
.flatMap(c -> Optional.of(c.get(0)))
.map(Condition::getType)
.filter("Unmanaged"::equals)
.isPresent()
);

// apply a change to the unmanaged topic
topic = Crds.topicOperation(kubernetesClient).resource(topic).get();
topic.setStatus(null);
topic.getSpec().getConfig().put(TopicConfig.RETENTION_MS_CONFIG, "1001");
topic = Crds.topicOperation(kubernetesClient).resource(topic).update();
var resourceVersionOnUpdate = topic.getMetadata().getResourceVersion();

TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt ->
!resourceVersionOnUpdate.equals(kt.getMetadata().getResourceVersion())
);
topic = Crds.topicOperation(kubernetesClient).resource(topic).get();
var resourceVersionAfterUpdate = topic.getMetadata().getResourceVersion();

// Wait a bit to check the resource is not getting updated continuously
Thread.sleep(500L);
TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt ->
resourceVersionAfterUpdate.equals(kt.getMetadata().getResourceVersion())
);
}

}

0 comments on commit 3bcb22a

Please sign in to comment.