-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Independent schema is set for each consumer generated by topic #6356
Changes from 15 commits
cdf7735
145f97e
f27d415
dc85737
65e6f83
260b137
09e1e40
d8e1c03
605061b
3b39049
9c1e24b
f18e248
4ef0e8d
a3285b7
2c580a1
91471ef
266cc9e
1da1611
4e4192c
e97dece
54c6782
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.schema; | ||
|
||
import com.google.common.collect.Sets; | ||
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; | ||
import org.apache.pulsar.client.api.Consumer; | ||
import org.apache.pulsar.client.api.Producer; | ||
import org.apache.pulsar.client.api.Schema; | ||
import org.apache.pulsar.client.api.schema.SchemaDefinition; | ||
import org.apache.pulsar.common.naming.TopicDomain; | ||
import org.apache.pulsar.common.naming.TopicName; | ||
import org.apache.pulsar.common.policies.data.ClusterData; | ||
import org.apache.pulsar.common.policies.data.TenantInfo; | ||
import org.testng.annotations.AfterMethod; | ||
import org.testng.annotations.BeforeMethod; | ||
import org.testng.annotations.Test; | ||
|
||
import java.util.Collections; | ||
|
||
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; | ||
import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName; | ||
import static org.junit.Assert.assertEquals; | ||
|
||
public class SchemaTest extends MockedPulsarServiceBaseTest { | ||
|
||
private final static String CLUSTER_NAME = "test"; | ||
|
||
@BeforeMethod | ||
@Override | ||
public void setup() throws Exception { | ||
super.internalSetup(); | ||
|
||
// Setup namespaces | ||
admin.clusters().createCluster(CLUSTER_NAME, new ClusterData(pulsar.getBrokerServiceUrl())); | ||
TenantInfo tenantInfo = new TenantInfo(); | ||
tenantInfo.setAllowedClusters(Collections.singleton(CLUSTER_NAME)); | ||
admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo); | ||
} | ||
|
||
@AfterMethod | ||
@Override | ||
public void cleanup() throws Exception { | ||
super.internalCleanup(); | ||
} | ||
|
||
@Test | ||
public void testMultiTopicSetSchemaProvider() throws Exception { | ||
final String tenant = PUBLIC_TENANT; | ||
final String namespace = "test-namespace-" + randomName(16); | ||
final String topicOne = "test-multi-version-schema-one"; | ||
final String topicTwo = "test-multi-version-schema-two"; | ||
final String fqtnOne = TopicName.get( | ||
TopicDomain.persistent.value(), | ||
tenant, | ||
namespace, | ||
topicOne | ||
).toString(); | ||
|
||
final String fqtnTwo = TopicName.get( | ||
TopicDomain.persistent.value(), | ||
tenant, | ||
namespace, | ||
topicTwo | ||
).toString(); | ||
|
||
|
||
admin.namespaces().createNamespace( | ||
tenant + "/" + namespace, | ||
Sets.newHashSet(CLUSTER_NAME) | ||
); | ||
|
||
admin.topics().createPartitionedTopic(fqtnOne, 3); | ||
admin.topics().createPartitionedTopic(fqtnTwo, 3); | ||
|
||
admin.schemas().createSchema(fqtnOne, Schema.AVRO( | ||
SchemaDefinition.<Schemas.PersonOne>builder().withAlwaysAllowNull | ||
(false).withSupportSchemaVersioning(true). | ||
withPojo(Schemas.PersonOne.class).build()).getSchemaInfo()); | ||
|
||
admin.schemas().createSchema(fqtnOne, Schema.AVRO( | ||
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull | ||
(false).withSupportSchemaVersioning(true). | ||
withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo()); | ||
|
||
admin.schemas().createSchema(fqtnTwo, Schema.AVRO( | ||
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull | ||
(false).withSupportSchemaVersioning(true). | ||
withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo()); | ||
|
||
Producer<Schemas.PersonTwo> producer = pulsarClient.newProducer(Schema.AVRO( | ||
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull | ||
(false).withSupportSchemaVersioning(true). | ||
withPojo(Schemas.PersonTwo.class).build())) | ||
.topic(fqtnOne) | ||
.create(); | ||
|
||
Schemas.PersonTwo personTwo = new Schemas.PersonTwo(); | ||
personTwo.setId(1); | ||
personTwo.setName("Tom"); | ||
|
||
|
||
Consumer<Schemas.PersonTwo> consumer = pulsarClient.newConsumer(Schema.AVRO( | ||
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull | ||
(false).withSupportSchemaVersioning(true). | ||
withPojo(Schemas.PersonTwo.class).build())) | ||
.subscriptionName("test") | ||
.topic(fqtnOne, fqtnTwo) | ||
.subscribe(); | ||
|
||
producer.send(personTwo); | ||
|
||
Schemas.PersonTwo personConsume = consumer.receive().getValue(); | ||
assertEquals("Tom", personConsume.getName()); | ||
assertEquals(1, personConsume.getId()); | ||
|
||
producer.close(); | ||
consumer.close(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,7 @@ | |
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.schema.compatibility; | ||
package org.apache.pulsar.schema; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.Data; | ||
|
@@ -29,36 +29,36 @@ public class Schemas { | |
@NoArgsConstructor | ||
@AllArgsConstructor | ||
public static class PersonOne{ | ||
int id; | ||
public int id; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you please explain why the access modifiers were switched to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SchemaCompatibilityCheckTest also used it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class provides all possible constructors and getters/setters for all fields. Please use them instead of defining fields as public |
||
} | ||
|
||
@Data | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
public static class PersonTwo{ | ||
int id; | ||
public int id; | ||
|
||
@AvroDefault("\"Tom\"") | ||
String name; | ||
public String name; | ||
} | ||
|
||
@Data | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
public static class PersonThree{ | ||
int id; | ||
public int id; | ||
|
||
String name; | ||
public String name; | ||
} | ||
|
||
@Data | ||
@AllArgsConstructor | ||
@NoArgsConstructor | ||
public static class PersonFour{ | ||
int id; | ||
public int id; | ||
|
||
String name; | ||
public String name; | ||
|
||
int age; | ||
public int age; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -136,6 +136,15 @@ default void configureSchemaInfo(String topic, String componentName, | |
// no-op | ||
} | ||
|
||
/** | ||
* Clone schema. | ||
* | ||
* @return cloned schema. | ||
*/ | ||
default Schema<T> cloneSchema() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you consider to use the standard There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems don't need to add the flag to control it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @congbobo184 do you mean that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding of what @vzhikserg said deviated, now I understand what he said. I will extends the Cloneable. |
||
return this; | ||
} | ||
|
||
/** | ||
* Schema that doesn't perform any encoding on the message payloads. Accepts a byte array and it passes it through. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,6 +70,9 @@ | |
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; | ||
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; | ||
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema; | ||
import org.apache.pulsar.client.impl.schema.AvroSchema; | ||
import org.apache.pulsar.client.impl.schema.KeyValueSchema; | ||
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks the imports are unused. |
||
import org.apache.pulsar.client.impl.schema.generic.MultiVersionSchemaInfoProvider; | ||
import org.apache.pulsar.client.impl.transaction.TransactionBuilderImpl; | ||
import org.apache.pulsar.client.util.ExecutorProvider; | ||
|
@@ -331,7 +334,7 @@ public <T> CompletableFuture<Consumer<T>> subscribeAsync(ConsumerConfigurationDa | |
|
||
private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) { | ||
return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic()) | ||
.thenCompose(ignored -> doSingleTopicSubscribeAsync(conf, schema, interceptors)); | ||
.thenCompose(schemaClone -> doSingleTopicSubscribeAsync(conf, schemaClone, interceptors)); | ||
} | ||
|
||
private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerConfigurationData<T> conf, Schema<T> schema, ConsumerInterceptors<T> interceptors) { | ||
|
@@ -444,7 +447,7 @@ public CompletableFuture<Reader<byte[]>> createReaderAsync(ReaderConfigurationDa | |
|
||
public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) { | ||
return preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName()) | ||
.thenCompose(ignored -> doCreateReaderAsync(conf, schema)); | ||
.thenCompose(schemaClone -> doCreateReaderAsync(conf, schemaClone)); | ||
} | ||
|
||
<T> CompletableFuture<Reader<T>> doCreateReaderAsync(ReaderConfigurationData<T> conf, Schema<T> schema) { | ||
|
@@ -764,8 +767,8 @@ private LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() | |
} | ||
|
||
@SuppressWarnings("unchecked") | ||
protected CompletableFuture<Void> preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, | ||
Schema schema, | ||
protected <T> CompletableFuture<Schema<T>> preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, | ||
Schema<T> schema, | ||
String topicName) { | ||
if (schema != null && schema.supportSchemaVersioning()) { | ||
final SchemaInfoProvider schemaInfoProvider; | ||
|
@@ -775,11 +778,12 @@ protected CompletableFuture<Void> preProcessSchemaBeforeSubscribe(PulsarClientIm | |
log.error("Failed to load schema info provider for topic {}", topicName, e); | ||
return FutureUtil.failedFuture(e.getCause()); | ||
} | ||
|
||
schema = schema.cloneSchema(); | ||
if (schema.requireFetchingSchemaInfo()) { | ||
Schema finalSchema = schema; | ||
return schemaInfoProvider.getLatestSchema().thenCompose(schemaInfo -> { | ||
if (null == schemaInfo) { | ||
if (!(schema instanceof AutoConsumeSchema)) { | ||
if (!(finalSchema instanceof AutoConsumeSchema)) { | ||
// no schema info is found | ||
return FutureUtil.failedFuture( | ||
new PulsarClientException.NotFoundException( | ||
|
@@ -788,18 +792,18 @@ protected CompletableFuture<Void> preProcessSchemaBeforeSubscribe(PulsarClientIm | |
} | ||
try { | ||
log.info("Configuring schema for topic {} : {}", topicName, schemaInfo); | ||
schema.configureSchemaInfo(topicName, "topic", schemaInfo); | ||
finalSchema.configureSchemaInfo(topicName, "topic", schemaInfo); | ||
} catch (RuntimeException re) { | ||
return FutureUtil.failedFuture(re); | ||
} | ||
schema.setSchemaInfoProvider(schemaInfoProvider); | ||
return CompletableFuture.completedFuture(null); | ||
finalSchema.setSchemaInfoProvider(schemaInfoProvider); | ||
return CompletableFuture.completedFuture(finalSchema); | ||
}); | ||
} else { | ||
schema.setSchemaInfoProvider(schemaInfoProvider); | ||
} | ||
} | ||
return CompletableFuture.completedFuture(null); | ||
return CompletableFuture.completedFuture(schema); | ||
} | ||
|
||
// | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -132,6 +132,11 @@ public void configureSchemaInfo(String topicName, | |
} | ||
} | ||
|
||
@Override | ||
public Schema<GenericRecord> cloneSchema() { | ||
return Schema.AUTO_CONSUME(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When you implementing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. �it don't need to copy fields, AutoConsumeSchema represent it can auto consume, if you want clone the fields you can clone the true schema in AutoConsumeSchema. After all, only its schema is changeable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand it can be changed. but There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you are right, we should clone the fields when clone a schema |
||
} | ||
|
||
private GenericSchema generateSchema(SchemaInfo schemaInfo) { | ||
if (schemaInfo.getType() != SchemaType.AVRO | ||
&& schemaInfo.getType() != SchemaType.JSON) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,5 +76,4 @@ public static GenericSchemaImpl of(SchemaInfo schemaInfo, | |
+ schemaInfo.getType() + "'"); | ||
} | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing license header