Skip to content

Commit

Permalink
Fix topic operator loop for unmanaged topics
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Díaz <[email protected]>
  • Loading branch information
padilo committed Aug 14, 2024
1 parent 73f5d03 commit a4abdbf
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,9 @@ private void updateStatusForException(ReconcilableTopic reconcilableTopic, Excep
}

private void updateStatus(ReconcilableTopic reconcilableTopic) {
if (!TopicOperatorUtil.isManaged(reconcilableTopic.kt())) {
return;
}
var oldStatus = Crds.topicOperation(kubeClient)
.inNamespace(reconcilableTopic.kt().getMetadata().getNamespace())
.withName(reconcilableTopic.kt().getMetadata().getName()).get().getStatus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import java.util.stream.Collectors;

import static java.lang.String.format;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
Expand Down Expand Up @@ -2131,4 +2132,35 @@ public void shouldReconcileOnTopicExistsException(
KafkaTopic kafkaTopic = createTopic(kafkaCluster, kafkaTopic(NAMESPACE, topicName, true, topicName, 2, 1));
assertTrue(readyIsTrue().test(kafkaTopic));
}

@Test
public void shouldUpdateAnUnmanagedTopic(
@BrokerConfig(name = "auto.create.topics.enable", value = "false")
KafkaCluster kafkaCluster) throws ExecutionException, InterruptedException {
var topicName = "my-topic";

// create the topic
var topic = createTopic(kafkaCluster,
kafkaTopic(NAMESPACE, topicName, SELECTOR, null, null, topicName, 1, 1,
Map.of(TopicConfig.RETENTION_MS_CONFIG, "1000")));
topic = Crds.topicOperation(kubernetesClient).resource(topic).get();

// set unmanaged
topic = Crds.topicOperation(kubernetesClient).resource(topic).get();
topic.getMetadata().getAnnotations().put(TopicOperatorUtil.MANAGED, "false");
topic = Crds.topicOperation(kubernetesClient).resource(topic).update();

// apply a change to the unmanaged topic
topic = Crds.topicOperation(kubernetesClient).resource(topic).get();
topic.getSpec().getConfig().put(TopicConfig.RETENTION_MS_CONFIG, "1001");
topic = Crds.topicOperation(kubernetesClient).resource(topic).update();

// verify resource is not getting update
var currentResourceVersion = topic.getMetadata().getResourceVersion();
await().during(500, TimeUnit.MILLISECONDS).until(
Crds.topicOperation(kubernetesClient).resource(topic)::get,
kt -> currentResourceVersion.equals(kt.getMetadata().getResourceVersion())
);
}

}

0 comments on commit a4abdbf

Please sign in to comment.