Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple Avro schema version in Pulsar SQL #4847

Merged
merged 23 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
0e0e9a0
Pulsar sql support schema version
congbobo Jul 30, 2019
5ba4030
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Sep 5, 2019
277ecfb
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Oct 10, 2019
b87e142
Add the decode byteBuf and modify test
congbobo Oct 10, 2019
244d22f
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Oct 11, 2019
1771681
modify the code style
congbobo Oct 11, 2019
137e1be
Modify the code check style
congbobo Oct 11, 2019
235048a
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Dec 4, 2019
123f5d8
Fix schema version provider key is byte[]
congbobo Dec 4, 2019
e06f6be
Fix some comments
congbobo Dec 9, 2019
1779434
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Dec 9, 2019
f8c11f0
Modify the codeStyle
congbobo Dec 9, 2019
78e86b5
Modify the check style
congbobo Dec 10, 2019
78df48c
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Feb 15, 2020
910b127
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Mar 10, 2020
57132c4
Key value add multiVersionSchema
congbobo Mar 10, 2020
51239bd
Fix some comment
congbobo Mar 17, 2020
bfce092
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Mar 19, 2020
59e2c40
no message
congbobo Mar 28, 2020
1cac019
Merge remote-tracking branch 'apache/master' into pulsar_sql_support_…
congbobo Mar 29, 2020
13ecb9a
Fix some comments
congbobo Mar 29, 2020
c9bd00f
Fix some comments
congbobo Mar 30, 2020
47aaccd
no message
congbobo Mar 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.protocol.schema.StoredSchema;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.CreateMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.testng.annotations.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.testng.annotations.AfterMethod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import org.apache.avro.reflect.ReflectData;
import org.apache.avro.Schema.Parser;
import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;

abstract class AbstractSchema<T> implements Schema<T> {
public abstract class AbstractSchema<T> implements Schema<T> {

/**
* Check if the message read able length length is a valid object for this schema.
Expand All @@ -47,7 +47,7 @@ void validate(ByteBuf byteBuf) {
* the byte buffer to decode
* @return the deserialized object
*/
abstract T decode(ByteBuf byteBuf);
public abstract T decode(ByteBuf byteBuf);
/**
* Decode a byteBuf into an object using a given version.
*
Expand All @@ -57,7 +57,7 @@ void validate(ByteBuf byteBuf) {
* the schema version to decode the object. null indicates using latest version.
* @return the deserialized object
*/
T decode(ByteBuf byteBuf, byte[] schemaVersion) {
public T decode(ByteBuf byteBuf, byte[] schemaVersion) {
// ignore version by default (most of the primitive schema implementations ignore schema version)
return decode(byteBuf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public T decode(byte[] bytes) {
@Override
public T decode(byte[] bytes, byte[] schemaVersion) {
try {
return readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
return schemaVersion == null ? decode(bytes) :
readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
} catch (ExecutionException | AvroTypeException e) {
if (e instanceof AvroTypeException) {
throw new SchemaSerializationException(e);
Expand All @@ -116,7 +117,8 @@ public T decode(ByteBuf byteBuf) {
@Override
public T decode(ByteBuf byteBuf, byte[] schemaVersion) {
try {
return readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(new ByteBufInputStream(byteBuf));
return schemaVersion == null ? decode(byteBuf) :
readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(new ByteBufInputStream(byteBuf));
} catch (ExecutionException e) {
LOG.error("Can't get generic schema for topic {} schema version {}",
schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
Expand Down Expand Up @@ -174,7 +176,7 @@ protected static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
return parser.parse(schemaJson);
}

protected static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
return SchemaInfo.builder()
.schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
.properties(schemaDefinition.getProperties())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public interface RawMessage {
*/
Optional<String> getKey();

/**
* Get the schema verison of the message.
*
* @return the schema version of the message
*/
byte[] getSchemaVersion();

/**
* Get byteBuf of the key.
*
Expand All @@ -115,5 +122,4 @@ public interface RawMessage {
* @return true if the key is base64 encoded, false otherwise
*/
boolean hasBase64EncodedKey();

}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ public Optional<String> getKey() {
}

@Override
public byte[] getSchemaVersion() {
if (msgMetadata != null && msgMetadata.get().hasSchemaVersion()) {
return msgMetadata.get().getSchemaVersion().toByteArray();
} else {
return null;
}
}

public Optional<ByteBuf> getKeyBytes() {
if (getKey().isPresent()) {
if (hasBase64EncodedKey()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;
package org.apache.pulsar.common.schema;

import com.google.common.base.MoreObjects;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;

/**
* Long schema version.
*/
public class LongSchemaVersion implements SchemaVersion {
private final long version;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Implementation of the common of the pulsar schema.
*/
package org.apache.pulsar.common.schema;
Original file line number Diff line number Diff line change
Expand Up @@ -18,92 +18,87 @@
*/
package org.apache.pulsar.sql.presto;

import com.google.common.annotations.VisibleForTesting;

import io.airlift.log.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;

import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;


/**
* Schema handler for payload in the Avro format.
*/
public class AvroSchemaHandler implements SchemaHandler {

private final DatumReader<GenericRecord> datumReader;

private final List<PulsarColumnHandle> columnHandles;

private static final FastThreadLocal<BinaryDecoder> decoders =
new FastThreadLocal<>();
private final GenericAvroSchema genericAvroSchema;

private final SchemaInfo schemaInfo;

private static final Logger log = Logger.get(AvroSchemaHandler.class);

public AvroSchemaHandler(Schema schema, List<PulsarColumnHandle> columnHandles) {
this.datumReader = new GenericDatumReader<>(schema);
AvroSchemaHandler(TopicName topicName,
PulsarConnectorConfig pulsarConnectorConfig,
SchemaInfo schemaInfo,
List<PulsarColumnHandle> columnHandles) throws PulsarClientException {
this(new PulsarSqlSchemaInfoProvider(topicName,
pulsarConnectorConfig.getPulsarAdmin()), schemaInfo, columnHandles);
}

AvroSchemaHandler(PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider,
SchemaInfo schemaInfo, List<PulsarColumnHandle> columnHandles) {
this.schemaInfo = schemaInfo;
this.genericAvroSchema = new GenericAvroSchema(schemaInfo);
this.genericAvroSchema.setSchemaInfoProvider(pulsarSqlSchemaInfoProvider);
this.columnHandles = columnHandles;
}

@Override
public Object deserialize(ByteBuf payload) {

ByteBuf heapBuffer = null;
try {
BinaryDecoder decoderFromCache = decoders.get();

// Make a copy into a heap buffer, since Avro cannot deserialize directly from direct memory
int size = payload.readableBytes();
heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer(size, size);
heapBuffer.writeBytes(payload);

BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(heapBuffer.array(), heapBuffer.arrayOffset(),
heapBuffer.readableBytes(), decoderFromCache);
if (decoderFromCache == null) {
decoders.set(decoder);
}
return this.datumReader.read(null, decoder);
} catch (IOException e) {
log.error(e);
} finally {
ReferenceCountUtil.safeRelease(heapBuffer);
}
return null;
return genericAvroSchema.decode(payload);
}

@Override
public Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload) {
return null;
public Object deserialize(ByteBuf payload, byte[] schemaVersion) {
return genericAvroSchema.decode(payload, schemaVersion);
}

@Override
public Object extractField(int index, Object currentRecord) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand the reason for the change in this method. I think to support multiple schema version decode does not affect extract field from the GenericRecord right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only add a deserialize method, and add a default interface for the keyPayload deserialize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm talking about extractField(int index, Object currentRecord) method. I noticed the new change is read by field names and we use read by position index before, is the previous method not enough to support multiple schema versions?

try {
GenericRecord record = (GenericRecord) currentRecord;
GenericAvroRecord record = (GenericAvroRecord) currentRecord;
PulsarColumnHandle pulsarColumnHandle = this.columnHandles.get(index);
Integer[] positionIndices = pulsarColumnHandle.getPositionIndices();
Object curr = record.get(positionIndices[0]);
if (curr == null) {
return null;
}
if (positionIndices.length > 0) {
for (int i = 1; i < positionIndices.length; i++) {
curr = ((GenericRecord) curr).get(positionIndices[i]);
if (curr == null) {
return null;
}
String[] names = pulsarColumnHandle.getFieldNames();

if (names.length == 1) {
return record.getField(pulsarColumnHandle.getFieldNames()[0]);
} else {
for (int i = 0; i < names.length - 1; i++) {
record = (GenericAvroRecord) record.getField(names[i]);
}
return record.getField(names[names.length - 1]);
}
return curr;
} catch (Exception ex) {
log.debug(ex, "%s", ex);
}
return null;
}

@VisibleForTesting
GenericAvroSchema getSchema() {
return this.genericAvroSchema;
}

@VisibleForTesting
SchemaInfo getSchemaInfo() {
return schemaInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ public Object deserialize(ByteBuf payload) {
}
}

@Override
public Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload) {
return null;
}

@Override
public Object extractField(int index, Object currentRecord) {
try {
Expand Down
Loading