Skip to content

Commit

Permalink
[fix][broker] fixed schema deletion erorr when deleting a partitioned…
Browse files Browse the repository at this point in the history
… topic and when the topic's reference is not ready
  • Loading branch information
heesung-sn committed Jan 27, 2024
1 parent d37d31f commit 73a27c5
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
Expand Down Expand Up @@ -3475,22 +3473,21 @@ public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
}

public CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
// delete schema at the upper level when deleting the partitioned topic.
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
}
String base = topicName.getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = getPulsar().getSchemaRegistryService();
return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
.thenCompose(schema -> {
if (schema != null) {
// It's different from `SchemasResource.deleteSchema`
// because when we delete a topic, the schema
// history is meaningless. But when we delete a schema of a topic, a new schema could be
// registered in the future.
log.info("Delete schema storage of id: {}", id);
return getPulsar().getSchemaRegistryService().deleteSchemaStorage(id);
} else {
return CompletableFuture.completedFuture(null);
}
});
return getPulsar().getSchemaRegistryService().deleteSchemaStorage(id).whenComplete((vid, ex) -> {
if (vid != null && ex == null) {
// It's different from `SchemasResource.deleteSchema`
// because when we delete a topic, the schema
// history is meaningless. But when we delete a schema of a topic, a new schema could be
// registered in the future.
log.info("Deleted schema storage of id: {}", id);
}
});
}

private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,8 @@ public static Exception bkException(String operation, int rc, long ledgerId, lon
message += " - entry=" + entryId;
}
boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException
&& rc != BKException.Code.NoSuchEntryException;
&& rc != BKException.Code.NoSuchEntryException
&& rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException;
return new SchemaException(recoverable, message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.tests.integration.schema.Schemas.Person;
import org.apache.pulsar.tests.integration.schema.Schemas.PersonConsumeSchema;
import org.apache.pulsar.tests.integration.schema.Schemas.Student;
Expand Down Expand Up @@ -316,5 +318,14 @@ public void testPrimitiveSchemaTypeCompatibilityCheck() {

}

@Test
public void testDeletePartitionedTopicWhenTopicReferenceIsNotReady() throws Exception {
final String topic = "persistent://public/default/tp-ref";
admin.topics().createPartitionedTopic(topic, 20);
admin.schemas().createSchema(topic,
SchemaInfo.builder().type(SchemaType.STRING).schema(new byte[0]).build());
admin.topics().deletePartitionedTopic(topic, false);
}

}

0 comments on commit 73a27c5

Please sign in to comment.