Skip to content

Commit

Permalink
Support KeyValue Schema Use Null Key And Null Value (apache#7139)
Browse files Browse the repository at this point in the history
Fixes apache#4804  

Thanks, @nlu90, work at apache#6384.

### Motivation

Currently, the KeyValue schema doesn't handle `null` key and `null` value well.
  • Loading branch information
gaoran10 authored and cdbartholomew committed Jul 24, 2020
1 parent d17ad53 commit 6bc8af2
Show file tree
Hide file tree
Showing 8 changed files with 294 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -145,4 +148,94 @@ public void nullValueBooleanSchemaTest() throws PulsarClientException {

}

@Test
public void keyValueNullInlineTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/kv-null-value-test";

@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
.topic(topic)
.create();

@Cleanup
Consumer<KeyValue<String, String>> consumer = pulsarClient
.newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
.topic(topic)
.subscriptionName("test")
.subscribe();

int numMessage = 10;
for (int i = 0; i < numMessage; i++) {
producer.newMessage().value(new KeyValue<>(null, "test")).send();
producer.newMessage().value(new KeyValue<>("test", null)).send();
producer.newMessage().value(new KeyValue<>(null, null)).send();
}

Message<KeyValue<String, String>> message;
KeyValue<String, String> keyValue;
for (int i = 0; i < numMessage; i++) {
message = consumer.receive();
keyValue = message.getValue();
Assert.assertNull(keyValue.getKey());
Assert.assertEquals("test", keyValue.getValue());

message = consumer.receive();
keyValue = message.getValue();
Assert.assertEquals("test", keyValue.getKey());
Assert.assertNull(keyValue.getValue());

message = consumer.receive();
keyValue = message.getValue();
Assert.assertNull(keyValue.getKey());
Assert.assertNull(keyValue.getValue());
}

}

@Test
public void keyValueNullSeparatedTest() throws PulsarClientException {
String topic = "persistent://prop/ns-abc/kv-null-value-test";

@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.topic(topic)
.create();

@Cleanup
Consumer<KeyValue<String, String>> consumer = pulsarClient
.newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.topic(topic)
.subscriptionName("test")
.subscribe();

int numMessage = 10;
for (int i = 0; i < numMessage; i++) {
producer.newMessage().value(new KeyValue<>(null, "test")).send();
producer.newMessage().value(new KeyValue<>("test", null)).send();
producer.newMessage().value(new KeyValue<>(null, null)).send();
}

Message<KeyValue<String, String>> message;
KeyValue<String, String> keyValue;
for (int i = 0; i < numMessage; i++) {
message = consumer.receive();
keyValue = message.getValue();
Assert.assertNull(keyValue.getKey());
Assert.assertEquals("test", keyValue.getValue());

message = consumer.receive();
keyValue = message.getValue();
Assert.assertEquals("test", keyValue.getKey());
Assert.assertNull(keyValue.getValue());

message = consumer.receive();
keyValue = message.getValue();
Assert.assertNull(keyValue.getKey());
Assert.assertNull(keyValue.getValue());
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,25 @@ public interface KeyValueDecoder<K, V> {
*/
public static <K, V> byte[] encode(K key, Schema<K> keyWriter,
V value, Schema<V> valueWriter) {
byte [] keyBytes = keyWriter.encode(key);
byte [] valueBytes = valueWriter.encode(value);
byte [] keyBytes;
if (key == null) {
keyBytes = new byte[0];
} else {
keyBytes = keyWriter.encode(key);
}

byte [] valueBytes;
if (value == null) {
valueBytes = new byte[0];
} else {
valueBytes = valueWriter.encode(value);
}
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length);
byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
byteBuffer
.putInt(key == null ? -1 : keyBytes.length)
.put(keyBytes)
.putInt(value == null ? -1 : valueBytes.length)
.put(valueBytes);
return byteBuffer.array();
}

Expand All @@ -113,12 +128,16 @@ public static <K, V> byte[] encode(K key, Schema<K> keyWriter,
public static <K, V> KeyValue<K, V> decode(byte[] data, KeyValueDecoder<K, V> decoder) {
ByteBuffer byteBuffer = ByteBuffer.wrap(data);
int keyLength = byteBuffer.getInt();
byte[] keyBytes = new byte[keyLength];
byteBuffer.get(keyBytes);
byte[] keyBytes = keyLength == -1 ? null : new byte[keyLength];
if (keyBytes != null) {
byteBuffer.get(keyBytes);
}

int valueLength = byteBuffer.getInt();
byte[] valueBytes = new byte[valueLength];
byteBuffer.get(valueBytes);
byte[] valueBytes = valueLength == -1 ? null : new byte[valueLength];
if (valueBytes != null) {
byteBuffer.get(valueBytes);
}

return decoder.decode(keyBytes, valueBytes);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ public static <T> MessageImpl<T> create(MessageMetadata.Builder msgMetadataBuild
msgMetadataBuilder.setNullValue(singleMessageMetadata.hasNullValue());
}

if (singleMessageMetadata.hasNullPartitionKey()) {
msgMetadataBuilder.setNullPartitionKey(singleMessageMetadata.hasNullPartitionKey());
}

this.schema = schema;
}

Expand Down Expand Up @@ -269,16 +273,16 @@ public byte[] getSchemaVersion() {
@Override
public T getValue() {
checkNotNull(msgMetadataBuilder);
if (msgMetadataBuilder.hasNullValue()) {
return null;
}
if (schema.getSchemaInfo() != null && SchemaType.KEY_VALUE == schema.getSchemaInfo().getType()) {
if (schema.supportSchemaVersioning()) {
return getKeyValueBySchemaVersion();
} else {
return getKeyValue();
}
} else {
if (msgMetadataBuilder.hasNullValue()) {
return null;
}
// check if the schema passed in from client supports schema versioning or not
// this is an optimization to only get schema version when necessary
if (schema.supportSchemaVersioning()) {
Expand All @@ -298,7 +302,9 @@ private T getKeyValueBySchemaVersion() {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
byte[] schemaVersion = getSchemaVersion();
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
return (T) kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
return (T) kvSchema.decode(
msgMetadataBuilder.hasNullPartitionKey() ? null : getKeyBytes(),
msgMetadataBuilder.hasNullValue() ? null : getData(), schemaVersion);
} else {
return schema.decode(getData(), schemaVersion);
}
Expand All @@ -307,7 +313,9 @@ private T getKeyValueBySchemaVersion() {
private T getKeyValue() {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
return (T) kvSchema.decode(getKeyBytes(), getData(), null);
return (T) kvSchema.decode(
msgMetadataBuilder.hasNullPartitionKey() ? null : getKeyBytes(),
msgMetadataBuilder.hasNullValue() ? null : getData(), null);
} else {
return schema.decode(getData());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ public TypedMessageBuilder<T> key(String key) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
"This method is not allowed to set keys when in encoding type is SEPARATED");
if (key == null) {
msgMetadataBuilder.setNullPartitionKey(true);
return this;
}
}
msgMetadataBuilder.setPartitionKey(key);
msgMetadataBuilder.setPartitionKeyB64Encoded(false);
Expand All @@ -123,6 +127,10 @@ public TypedMessageBuilder<T> keyBytes(byte[] key) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
"This method is not allowed to set keys when in encoding type is SEPARATED");
if (key == null) {
msgMetadataBuilder.setNullPartitionKey(true);
return this;
}
}
msgMetadataBuilder.setPartitionKey(Base64.getEncoder().encodeToString(key));
msgMetadataBuilder.setPartitionKeyB64Encoded(true);
Expand All @@ -146,11 +154,20 @@ public TypedMessageBuilder<T> value(T value) {
org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value;
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
// set key as the message key
msgMetadataBuilder.setPartitionKey(
Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
msgMetadataBuilder.setPartitionKeyB64Encoded(true);
if (kv.getKey() != null) {
msgMetadataBuilder.setPartitionKey(
Base64.getEncoder().encodeToString(kvSchema.getKeySchema().encode(kv.getKey())));
msgMetadataBuilder.setPartitionKeyB64Encoded(true);
} else {
this.msgMetadataBuilder.setNullPartitionKey(true);
}

// set value as the payload
this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
if (kv.getValue() != null) {
this.content = ByteBuffer.wrap(kvSchema.getValueSchema().encode(kv.getValue()));
} else {
this.msgMetadataBuilder.setNullValue(true);
}
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ public byte[] encode(KeyValue<K, V> message) {
valueSchema
);
} else {
if (message.getValue() == null) {
return null;
}
return valueSchema.encode(message.getValue());
}
}
Expand All @@ -150,16 +153,25 @@ public KeyValue<K, V> decode(byte[] bytes, byte[] schemaVersion) {

public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes, byte[] schemaVersion) {
K k;
if (keySchema.supportSchemaVersioning() && schemaVersion != null) {
k = keySchema.decode(keyBytes, schemaVersion);
if (keyBytes == null) {
k = null;
} else {
k = keySchema.decode(keyBytes);
if (keySchema.supportSchemaVersioning() && schemaVersion != null) {
k = keySchema.decode(keyBytes, schemaVersion);
} else {
k = keySchema.decode(keyBytes);
}
}

V v;
if (valueSchema.supportSchemaVersioning() && schemaVersion != null) {
v = valueSchema.decode(valueBytes, schemaVersion);
if (valueBytes == null) {
v = null;
} else {
v = valueSchema.decode(valueBytes);
if (valueSchema.supportSchemaVersioning() && schemaVersion != null) {
v = valueSchema.decode(valueBytes, schemaVersion);
} else {
v = valueSchema.decode(valueBytes);
}
}
return new KeyValue<>(k, v);
}
Expand Down
Loading

0 comments on commit 6bc8af2

Please sign in to comment.