diff --git a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java index 0e37ae20442..bfc33bc33e0 100644 --- a/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java +++ b/topic-operator/src/main/java/io/strimzi/operator/topic/BatchingTopicController.java @@ -11,10 +11,10 @@ import io.strimzi.api.kafka.model.common.ConditionBuilder; import io.strimzi.api.kafka.model.topic.KafkaTopic; import io.strimzi.api.kafka.model.topic.KafkaTopicBuilder; -import io.strimzi.api.kafka.model.topic.KafkaTopicStatus; import io.strimzi.api.kafka.model.topic.KafkaTopicStatusBuilder; import io.strimzi.operator.common.Reconciliation; import io.strimzi.operator.common.ReconciliationLogger; +import io.strimzi.operator.common.model.StatusDiff; import io.strimzi.operator.common.model.StatusUtils; import io.strimzi.operator.topic.metrics.TopicOperatorMetricsHolder; import io.strimzi.operator.topic.model.Either; @@ -186,7 +186,7 @@ static String resourceVersion(KafkaTopic kt) { private List addOrRemoveFinalizer(boolean useFinalizer, List reconcilableTopics) { List collect = reconcilableTopics.stream() .map(reconcilableTopic -> - new ReconcilableTopic(reconcilableTopic.reconciliation(), useFinalizer + new ReconcilableTopic(reconcilableTopic.reconciliation(), useFinalizer ? addFinalizer(reconcilableTopic) : removeFinalizer(reconcilableTopic), reconcilableTopic.topicName())) .collect(Collectors.toList()); LOGGER.traceOp("{} {} topics", useFinalizer ? "Added finalizers to" : "Removed finalizers from", reconcilableTopics.size()); @@ -1132,7 +1132,7 @@ private void updateStatusForSuccess(ReconcilableTopic reconcilableTopic) { } else if (TopicOperatorUtil.isPaused(reconcilableTopic.kt())) { conditionType = "ReconciliationPaused"; } - + conditions.add(new ConditionBuilder() .withType(conditionType) .withStatus("True") @@ -1210,10 +1210,10 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) { var oldStatus = Crds.topicOperation(kubeClient) .inNamespace(reconcilableTopic.kt().getMetadata().getNamespace()) .withName(reconcilableTopic.kt().getMetadata().getName()).get().getStatus(); - + // the observedGeneration is a marker that shows that the operator works and that it saw the last update to the resource reconcilableTopic.kt().getStatus().setObservedGeneration(reconcilableTopic.kt().getMetadata().getGeneration()); - + // set or reset the topicName reconcilableTopic.kt().getStatus().setTopicName( !TopicOperatorUtil.isManaged(reconcilableTopic.kt()) @@ -1222,8 +1222,9 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) { ? oldStatus.getTopicName() : TopicOperatorUtil.topicName(reconcilableTopic.kt()) ); - - if (statusChanged(reconcilableTopic.kt(), oldStatus)) { + + StatusDiff statusDiff = new StatusDiff(oldStatus, reconcilableTopic.kt().getStatus()); + if (!statusDiff.isEmpty()) { try { var updatedTopic = new KafkaTopicBuilder(reconcilableTopic.kt()) .editOrNewMetadata() @@ -1242,49 +1243,4 @@ private void updateStatus(ReconcilableTopic reconcilableTopic) { } } } - - private boolean statusChanged(KafkaTopic kt, KafkaTopicStatus oldStatus) { - return oldStatusOrTopicNameMissing(oldStatus) - || nonPausedAndDifferentGenerations(kt, oldStatus) - || differentConditions(kt.getStatus().getConditions(), oldStatus.getConditions()) - || replicasChangesDiffer(kt, oldStatus); - } - - private boolean oldStatusOrTopicNameMissing(KafkaTopicStatus oldStatus) { - return oldStatus == null || oldStatus.getTopicName() == null; - } - - private boolean nonPausedAndDifferentGenerations(KafkaTopic kt, KafkaTopicStatus oldStatus) { - return !TopicOperatorUtil.isPaused(kt) && oldStatus.getObservedGeneration() != kt.getMetadata().getGeneration(); - } - - private boolean differentConditions(List newConditions, List oldConditions) { - if (Objects.equals(newConditions, oldConditions)) { - return false; - } else if (newConditions == null || oldConditions == null || newConditions.size() != oldConditions.size()) { - return true; - } else { - for (int i = 0; i < newConditions.size(); i++) { - if (conditionsDiffer(newConditions.get(i), oldConditions.get(i))) { - return true; - } - } - } - return false; - } - - private boolean conditionsDiffer(Condition newCondition, Condition oldCondition) { - return !Objects.equals(newCondition.getType(), oldCondition.getType()) - || !Objects.equals(newCondition.getStatus(), oldCondition.getStatus()) - || !Objects.equals(newCondition.getReason(), oldCondition.getReason()) - || !Objects.equals(newCondition.getMessage(), oldCondition.getMessage()); - } - - @SuppressWarnings("BooleanExpressionComplexity") - private boolean replicasChangesDiffer(KafkaTopic kt, KafkaTopicStatus oldStatus) { - return kt.getStatus().getReplicasChange() == null && oldStatus.getReplicasChange() != null - || kt.getStatus().getReplicasChange() != null && oldStatus.getReplicasChange() == null - || (kt.getStatus().getReplicasChange() != null && oldStatus.getReplicasChange() != null - && !Objects.equals(kt.getStatus().getReplicasChange(), oldStatus.getReplicasChange())); - } } diff --git a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java index 23cee8bc9de..9d634f31a01 100644 --- a/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java +++ b/topic-operator/src/test/java/io/strimzi/operator/topic/TopicControllerIT.java @@ -244,11 +244,23 @@ private static Predicate readyIsFalseAndReasonIs(String requiredReas } private static Predicate readyIsTrueOrFalse() { + return typeIsTrueOrFalse("Ready"); + } + + private static Predicate unmanagedIsTrueOrFalse() { + return typeIsTrueOrFalse("Unmanaged"); + } + + private static Predicate typeIsTrueOrFalse(String type) { Predicate conditionPredicate = condition -> - "Ready".equals(condition.getType()) - && "True".equals(condition.getStatus()) - || "False".equals(condition.getStatus()); - return isReconcilatedAndHasConditionMatching("Ready=True or False", conditionPredicate); + type.equals(condition.getType()) + && "True".equals(condition.getStatus()) + || "False".equals(condition.getStatus()); + return isReconcilatedAndHasConditionMatching(type + "=True or False", conditionPredicate); + } + + private static Predicate unmanagedStatusTrue() { + return typeIsTrueOrFalse("Unmanaged"); } private static Predicate unmanagedIsTrue() { @@ -665,7 +677,7 @@ public void shouldNotCreateTopicInKafkaWhenUnmanagedTopicCreatedInKube( // given // when - var reconciled = createTopic(kafkaCluster, kt); + var reconciled = createTopic(kafkaCluster, kt, unmanagedStatusTrue()); // then assertNull(reconciled.getStatus().getTopicName()); @@ -1137,7 +1149,7 @@ public void shouldRestoreFinalizerIfRemoved( @BrokerConfig(name = "auto.create.topics.enable", value = "false") KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException, TimeoutException { // given - var created = createTopic(kafkaCluster, kt); + var created = createTopic(kafkaCluster, kt, TopicOperatorUtil.isManaged(kt) ? readyIsTrueOrFalse() : unmanagedIsTrueOrFalse()); if (TopicOperatorUtil.isManaged(kt)) { assertCreateSuccess(kt, created); } @@ -2153,4 +2165,63 @@ public void shouldReconcileOnTopicExistsException( KafkaTopic kafkaTopic = createTopic(kafkaCluster, kafkaTopic(NAMESPACE, topicName, true, topicName, 2, 1)); assertTrue(readyIsTrue().test(kafkaTopic)); } + + @Test + public void shouldUpdateAnUnmanagedTopic( + @BrokerConfig(name = "auto.create.topics.enable", value = "false") + KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException { + var topicName = "my-topic"; + + // create the topic + var topic = createTopic(kafkaCluster, + kafkaTopic(NAMESPACE, topicName, SELECTOR, null, null, topicName, 1, 1, + Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000"))); + topic = Crds.topicOperation(kubernetesClient).resource(topic).get(); + + TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt -> + Optional.of(kt) + .map(KafkaTopic::getStatus) + .map(KafkaTopicStatus::getConditions) + .flatMap(c -> Optional.of(c.get(0))) + .map(Condition::getType) + .filter("Ready"::equals) + .isPresent() + ); + + // set unmanaged + topic = Crds.topicOperation(kubernetesClient).resource(topic).get(); + topic.setStatus(null); + topic.getMetadata().getAnnotations().put(TopicOperatorUtil.MANAGED, "false"); + topic = Crds.topicOperation(kubernetesClient).resource(topic).update(); + + TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt -> + Optional.of(kt) + .map(KafkaTopic::getStatus) + .map(KafkaTopicStatus::getConditions) + .flatMap(c -> Optional.of(c.get(0))) + .map(Condition::getType) + .filter("Unmanaged"::equals) + .isPresent() + ); + + // apply a change to the unmanaged topic + topic = Crds.topicOperation(kubernetesClient).resource(topic).get(); + topic.setStatus(null); + topic.getSpec().getConfig().put(TopicConfig.RETENTION_MS_CONFIG, "1001"); + topic = Crds.topicOperation(kubernetesClient).resource(topic).update(); + var resourceVersionOnUpdate = topic.getMetadata().getResourceVersion(); + + TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt -> + !resourceVersionOnUpdate.equals(kt.getMetadata().getResourceVersion()) + ); + topic = Crds.topicOperation(kubernetesClient).resource(topic).get(); + var resourceVersionAfterUpdate = topic.getMetadata().getResourceVersion(); + + // Wait a bit to check the resource is not getting updated continuously + Thread.sleep(500L); + TopicOperatorTestUtil.waitUntilCondition(Crds.topicOperation(kubernetesClient).resource(topic), kt -> + resourceVersionAfterUpdate.equals(kt.getMetadata().getResourceVersion()) + ); + } + }