Skip to content

Commit 4c84788

Browse files
authored
[improve][broker] Improve exception for topic does not have schema to check (#22974)
1 parent 7f4c0c5 commit 4c84788

File tree

5 files changed

+80
-8
lines changed

5 files changed

+80
-8
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@
6666
import org.apache.pulsar.broker.service.TopicAttributes;
6767
import org.apache.pulsar.broker.service.TopicPolicyListener;
6868
import org.apache.pulsar.broker.service.TransportCnx;
69+
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
70+
import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
6971
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
7072
import org.apache.pulsar.broker.stats.NamespaceStats;
7173
import org.apache.pulsar.client.api.MessageId;
@@ -1239,7 +1241,16 @@ public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schem
12391241
|| (!producers.isEmpty())
12401242
|| (numActiveConsumersWithoutAutoSchema != 0)
12411243
|| ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
1242-
return checkSchemaCompatibleForConsumer(schema);
1244+
return checkSchemaCompatibleForConsumer(schema)
1245+
.exceptionally(ex -> {
1246+
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
1247+
if (realCause instanceof NotExistSchemaException) {
1248+
throw FutureUtil.wrapToCompletionException(
1249+
new IncompatibleSchemaException("Failed to add schema to an active topic"
1250+
+ " with empty(BYTES) schema: new schema type " + schema.getType()));
1251+
}
1252+
throw FutureUtil.wrapToCompletionException(realCause);
1253+
});
12431254
} else {
12441255
return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null));
12451256
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,8 @@
132132
import org.apache.pulsar.broker.service.TransportCnx;
133133
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
134134
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
135+
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
136+
import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
135137
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
136138
import org.apache.pulsar.broker.stats.NamespaceStats;
137139
import org.apache.pulsar.broker.stats.ReplicationMetrics;
@@ -4048,7 +4050,16 @@ public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schem
40484050
|| (userCreatedProducerCount > 0)
40494051
|| (numActiveConsumersWithoutAutoSchema != 0)
40504052
|| (ledger.getTotalSize() != 0)) {
4051-
return checkSchemaCompatibleForConsumer(schema);
4053+
return checkSchemaCompatibleForConsumer(schema)
4054+
.exceptionally(ex -> {
4055+
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
4056+
if (realCause instanceof NotExistSchemaException) {
4057+
throw FutureUtil.wrapToCompletionException(
4058+
new IncompatibleSchemaException("Failed to add schema to an active topic"
4059+
+ " with empty(BYTES) schema: new schema type " + schema.getType()));
4060+
}
4061+
throw FutureUtil.wrapToCompletionException(realCause);
4062+
});
40524063
} else {
40534064
return addSchema(schema).thenCompose(schemaVersion ->
40544065
CompletableFuture.completedFuture(null));

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.commons.lang3.tuple.Pair;
4949
import org.apache.pulsar.broker.PulsarService;
5050
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
51+
import org.apache.pulsar.broker.service.schema.exceptions.NotExistSchemaException;
5152
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
5253
import org.apache.pulsar.broker.service.schema.proto.SchemaRegistryFormat;
5354
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
@@ -398,7 +399,7 @@ public CompletableFuture<Void> checkConsumerCompatibility(String schemaId, Schem
398399
return checkCompatibilityWithAll(schemaId, schemaData, strategy);
399400
}
400401
} else {
401-
return FutureUtil.failedFuture(new IncompatibleSchemaException("Topic does not have schema to check"));
402+
return FutureUtil.failedFuture(new NotExistSchemaException("Topic does not have schema to check"));
402403
}
403404
});
404405
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service.schema.exceptions;
20+
21+
/**
22+
* Exception is thrown when an schema not exist.
23+
*/
24+
public class NotExistSchemaException extends SchemaException {
25+
26+
private static final long serialVersionUID = -8342983749283749283L;
27+
28+
public NotExistSchemaException() {
29+
super("The schema does not exist");
30+
}
31+
32+
public NotExistSchemaException(String message) {
33+
super(message);
34+
}
35+
36+
public NotExistSchemaException(String message, Throwable e) {
37+
super(message, e);
38+
}
39+
40+
public NotExistSchemaException(Throwable e) {
41+
super(e);
42+
}
43+
}

pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import org.testng.Assert;
9797
import org.testng.annotations.AfterMethod;
9898
import org.testng.annotations.BeforeMethod;
99+
import org.testng.annotations.DataProvider;
99100
import org.testng.annotations.Test;
100101

101102
@Slf4j
@@ -125,6 +126,11 @@ public void cleanup() throws Exception {
125126
super.internalCleanup();
126127
}
127128

129+
@DataProvider(name = "topicDomain")
130+
public static Object[] topicDomain() {
131+
return new Object[] { "persistent://", "non-persistent://" };
132+
}
133+
128134
@Test
129135
public void testGetSchemaWhenCreateAutoProduceBytesProducer() throws Exception{
130136
final String tenant = PUBLIC_TENANT;
@@ -1336,19 +1342,19 @@ private void testIncompatibleSchema() throws Exception {
13361342
* the new consumer to register new schema. But before we can solve this problem, we need to modify
13371343
* "CmdProducer" to let the Broker know that the Producer uses a schema of type "AUTO_PRODUCE_BYTES".
13381344
*/
1339-
@Test
1340-
public void testAutoProduceAndSpecifiedConsumer() throws Exception {
1345+
@Test(dataProvider = "topicDomain")
1346+
public void testAutoProduceAndSpecifiedConsumer(String domain) throws Exception {
13411347
final String namespace = PUBLIC_TENANT + "/ns_" + randomName(16);
13421348
admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME));
1343-
final String topicName = "persistent://" + namespace + "/tp_" + randomName(16);
1349+
final String topicName = domain + namespace + "/tp_" + randomName(16);
13441350
admin.topics().createNonPartitionedTopic(topicName);
13451351

13461352
Producer producer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
13471353
try {
13481354
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName("sub1").subscribe();
1349-
fail("Should throw ex: Topic does not have schema to check");
1355+
fail("Should throw ex: Failed to add schema to an active topic with empty(BYTES) schema");
13501356
} catch (Exception ex){
1351-
assertTrue(ex.getMessage().contains("Topic does not have schema to check"));
1357+
assertTrue(ex.getMessage().contains("Failed to add schema to an active topic with empty(BYTES) schema"));
13521358
}
13531359

13541360
// Cleanup.

0 commit comments

Comments
 (0)