Skip to content

Commit

Permalink
[fix][broker] fixed schema deletion erorr when deleting a partitioned…
Browse files Browse the repository at this point in the history
… topic and when the topic's reference is not ready
  • Loading branch information
heesung-sn committed Jan 27, 2024
1 parent d37d31f commit 5563eb0
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
Expand Down Expand Up @@ -3475,22 +3473,21 @@ public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
}

public CompletableFuture<SchemaVersion> deleteSchema(TopicName topicName) {
// delete schema at the upper level when deleting the partitioned topic.
if (topicName.isPartitioned()) {
return CompletableFuture.completedFuture(null);
}
String base = topicName.getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = getPulsar().getSchemaRegistryService();
return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id))
.thenCompose(schema -> {
if (schema != null) {
// It's different from `SchemasResource.deleteSchema`
// because when we delete a topic, the schema
// history is meaningless. But when we delete a schema of a topic, a new schema could be
// registered in the future.
log.info("Delete schema storage of id: {}", id);
return getPulsar().getSchemaRegistryService().deleteSchemaStorage(id);
} else {
return CompletableFuture.completedFuture(null);
}
});
return getPulsar().getSchemaRegistryService().deleteSchemaStorage(id).whenComplete((vid, ex) -> {
if (vid != null && ex == null) {
// It's different from `SchemasResource.deleteSchema`
// because when we delete a topic, the schema
// history is meaningless. But when we delete a schema of a topic, a new schema could be
// registered in the future.
log.info("Deleted schema storage of id: {}", id);
}
});
}

private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,8 @@ public static Exception bkException(String operation, int rc, long ledgerId, lon
message += " - entry=" + entryId;
}
boolean recoverable = rc != BKException.Code.NoSuchLedgerExistsException
&& rc != BKException.Code.NoSuchEntryException;
&& rc != BKException.Code.NoSuchEntryException
&& rc != BKException.Code.NoSuchLedgerExistsOnMetadataServerException;
return new SchemaException(recoverable, message);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,29 @@
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.Sets;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.NotFoundException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.tests.integration.schema.Schemas.Person;
import org.apache.pulsar.tests.integration.schema.Schemas.PersonConsumeSchema;
import org.apache.pulsar.tests.integration.schema.Schemas.Student;
import org.apache.pulsar.tests.integration.schema.Schemas.AvroLogicalType;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

import java.math.BigDecimal;
Expand Down Expand Up @@ -316,5 +324,14 @@ public void testPrimitiveSchemaTypeCompatibilityCheck() {

}

@Test(invocationCount = 30)
public void testDeletePartitionedTopicWhenTopicReferenceIsNotReady() throws Exception {
final String topic = "persistent://public/default/tp-ref";
admin.topics().createPartitionedTopic(topic, 20);
admin.schemas().createSchema(topic,
SchemaInfo.builder().type(SchemaType.STRING).schema(new byte[0]).build());
admin.topics().deletePartitionedTopic(topic, false);
}

}

0 comments on commit 5563eb0

Please sign in to comment.