Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support KeyValue Schema Use Null Key And Null Value #7139

Merged
merged 2 commits into from
Jun 4, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -145,4 +148,94 @@ public void nullValueBooleanSchemaTest() throws PulsarClientException {

}

@Test
public void keyValueNullInlineTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/kv-null-value-test";

@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
.topic(topic)
.create();

@Cleanup
Consumer<KeyValue<String, String>> consumer = pulsarClient
.newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
.topic(topic)
.subscriptionName("test")
.subscribe();

int numMessage = 10;
for (int i = 0; i < numMessage; i++) {
producer.newMessage().value(new KeyValue<>(null, "test")).send();
producer.newMessage().value(new KeyValue<>("test", null)).send();
producer.newMessage().value(new KeyValue<>(null, null)).send();
}

Message<KeyValue<String, String>> message;
KeyValue<String, String> keyValue;
for (int i = 0; i < numMessage; i++) {
message = consumer.receive();
keyValue = message.getValue();
Assert.assertNull(keyValue.getKey());
Assert.assertEquals("test", keyValue.getValue());

message = consumer.receive();
keyValue = message.getValue();
Assert.assertEquals("test", keyValue.getKey());
Assert.assertNull(keyValue.getValue());

message = consumer.receive();
keyValue = message.getValue();
Assert.assertNull(keyValue.getKey());
Assert.assertNull(keyValue.getValue());
}

}

@Test
public void keyValueNullSeparatedTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/kv-null-value-test";

@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.topic(topic)
.create();

@Cleanup
Consumer<KeyValue<String, String>> consumer = pulsarClient
.newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.topic(topic)
.subscriptionName("test")
.subscribe();

int numMessage = 10;
for (int i = 0; i < numMessage; i++) {
producer.newMessage().value(new KeyValue<>(null, "test")).send();
producer.newMessage().value(new KeyValue<>("test", null)).send();
producer.newMessage().value(new KeyValue<>(null, null)).send();
}

Message<KeyValue<String, String>> message;
KeyValue<String, String> keyValue;
for (int i = 0; i < numMessage; i++) {
message = consumer.receive();
keyValue = message.getValue();
Assert.assertNull(keyValue.getKey());
Assert.assertEquals("test", keyValue.getValue());

message = consumer.receive();
keyValue = message.getValue();
Assert.assertEquals("test", keyValue.getKey());
Assert.assertNull(keyValue.getValue());

message = consumer.receive();
keyValue = message.getValue();
Assert.assertNull(keyValue.getKey());
Assert.assertNull(keyValue.getValue());
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,25 @@ public interface KeyValueDecoder<K, V> {
*/
public static <K, V> byte[] encode(K key, Schema<K> keyWriter,
V value, Schema<V> valueWriter) {
byte [] keyBytes = keyWriter.encode(key);
byte [] valueBytes = valueWriter.encode(value);
byte [] keyBytes;
if (key == null) {
keyBytes = new byte[0];
} else {
keyBytes = keyWriter.encode(key);
}

byte [] valueBytes;
if (value == null) {
valueBytes = new byte[0];
} else {
valueBytes = valueWriter.encode(value);
}
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length);
byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
byteBuffer
.putInt(key == null ? -1 : keyBytes.length)
.put(keyBytes)
.putInt(value == null ? -1 : valueBytes.length)
.put(valueBytes);
return byteBuffer.array();
}

Expand All @@ -113,12 +128,16 @@ public static <K, V> byte[] encode(K key, Schema<K> keyWriter,
public static <K, V> KeyValue<K, V> decode(byte[] data, KeyValueDecoder<K, V> decoder) {
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
int keyLength = byteBuffer.getInt();
byte[] keyBytes = new byte[keyLength];
byteBuffer.get(keyBytes);
byte[] keyBytes = keyLength == -1 ? null : new byte[keyLength];
if (keyBytes != null) {
byteBuffer.get(keyBytes);
}

int valueLength = byteBuffer.getInt();
byte[] valueBytes = new byte[valueLength];
byteBuffer.get(valueBytes);
byte[] valueBytes = valueLength == -1 ? null : new byte[valueLength];
if (valueBytes != null) {
byteBuffer.get(valueBytes);
}

return decoder.decode(keyBytes, valueBytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ public static <T> MessageImpl<T> create(MessageMetadata.Builder msgMetadataBuild
msgMetadataBuilder.setNullValue(singleMessageMetadata.hasNullValue());
}

if (singleMessageMetadata.hasNullKey()) {
msgMetadataBuilder.setNullKey(singleMessageMetadata.hasNullKey());
}

this.schema = schema;
}

Expand Down Expand Up @@ -269,16 +273,16 @@ public byte[] getSchemaVersion() {
@Override
public T getValue() {
checkNotNull(msgMetadataBuilder);
if (msgMetadataBuilder.hasNullValue()) {
return null;
}
if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
if (schema.supportSchemaVersioning()) {
return getKeyValueBySchemaVersion();
} else {
return getKeyValue();
}
} else {
if (msgMetadataBuilder.hasNullValue()) {
return null;
}
// check if the schema passed in from client supports schema versioning or not
// this is an optimization to only get schema version when necessary
if (schema.supportSchemaVersioning()) {
Expand All @@ -298,7 +302,9 @@ private T getKeyValueBySchemaVersion() {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
byte[] schemaVersion = getSchemaVersion();
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
return (T) kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
return (T) kvSchema.decode(
msgMetadataBuilder.hasNullKey() ? null : getKeyBytes(),
msgMetadataBuilder.hasNullValue() ? null : getData(), schemaVersion);
} else {
return schema.decode(getData(), schemaVersion);
}
Expand All @@ -307,7 +313,9 @@ private T getKeyValueBySchemaVersion() {
private T getKeyValue() {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
return (T) kvSchema.decode(getKeyBytes(), getData(), null);
return (T) kvSchema.decode(
msgMetadataBuilder.hasNullKey() ? null : getKeyBytes(),
msgMetadataBuilder.hasNullValue() ? null : getData(), null);
} else {
return schema.decode(getData());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public TypedMessageBuilder<T> key(String key) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
"This method is not allowed to set keys when in encoding type is SEPARATED");
if (key == null) {
msgMetadataBuilder.setNullKey(true);
return this;
}
}
msgMetadataBuilder.setPartitionKey(key);
msgMetadataBuilder.setPartitionKeyB64Encoded(false);
Expand All @@ -123,6 +127,10 @@ public TypedMessageBuilder<T> keyBytes(byte[] key) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
"This method is not allowed to set keys when in encoding type is SEPARATED");
if (key == null) {
msgMetadataBuilder.setNullKey(true);
return this;
}
}
msgMetadataBuilder.setPartitionKey(Base64.getEncoder().encodeToString(key));
msgMetadataBuilder.setPartitionKeyB64Encoded(true);
Expand All @@ -146,11 +154,20 @@ public TypedMessageBuilder<T> value(T value) {
org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value;
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
// set key as the message key
msgMetadataBuilder.setPartitionKey(
Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
msgMetadataBuilder.setPartitionKeyB64Encoded(true);
if (kv.getKey() != null) {
msgMetadataBuilder.setPartitionKey(
Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
msgMetadataBuilder.setPartitionKeyB64Encoded(true);
} else {
this.msgMetadataBuilder.setNullKey(true);
}

// set value as the payload
this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
if (kv.getValue() != null) {
this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
} else {
this.msgMetadataBuilder.setNullValue(true);
}
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ public byte[] encode(KeyValue<K, V> message) {
valueSchema
);
} else {
if (message.getValue() == null) {
return null;
}
return valueSchema.encode(message.getValue());
}
}
Expand All @@ -150,16 +153,25 @@ public KeyValue<K, V> decode(byte[] bytes, byte[] schemaVersion) {

public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes, byte[] schemaVersion) {
K k;
if (keySchema.supportSchemaVersioning() && schemaVersion != null) {
k = keySchema.decode(keyBytes, schemaVersion);
if (keyBytes == null) {
k = null;
} else {
k = keySchema.decode(keyBytes);
if (keySchema.supportSchemaVersioning() && schemaVersion != null) {
k = keySchema.decode(keyBytes, schemaVersion);
} else {
k = keySchema.decode(keyBytes);
}
}

V v;
if (valueSchema.supportSchemaVersioning() && schemaVersion != null) {
v = valueSchema.decode(valueBytes, schemaVersion);
if (valueBytes == null) {
v = null;
} else {
v = valueSchema.decode(valueBytes);
if (valueSchema.supportSchemaVersioning() && schemaVersion != null) {
v = valueSchema.decode(valueBytes, schemaVersion);
} else {
v = valueSchema.decode(valueBytes);
}
}
return new KeyValue<>(k, v);
}
Expand Down
Loading