Skip to content

Commit

Permalink
Support multiple Avro schema version in Pulsar SQL (#4847)
Browse files Browse the repository at this point in the history
Support multiple Avro schema version in Pulsar SQL
  • Loading branch information
congbobo184 authored May 19, 2020
1 parent d55bc00 commit 097108a
Show file tree
Hide file tree
Showing 24 changed files with 466 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@

import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.protocol.schema.StoredSchema;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.CreateMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.pulsar.common.protocol.schema.StoredSchema;
import org.apache.pulsar.common.schema.SchemaType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.testng.annotations.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistry.SchemaAndMetadata;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.testng.annotations.AfterMethod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import org.apache.avro.reflect.ReflectData;
import org.apache.avro.Schema.Parser;
import org.apache.pulsar.broker.service.schema.LongSchemaVersion;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;

abstract class AbstractSchema<T> implements Schema<T> {
public abstract class AbstractSchema<T> implements Schema<T> {

/**
* Check if the message read able length length is a valid object for this schema.
Expand All @@ -47,7 +47,7 @@ void validate(ByteBuf byteBuf) {
* the byte buffer to decode
* @return the deserialized object
*/
abstract T decode(ByteBuf byteBuf);
public abstract T decode(ByteBuf byteBuf);
/**
* Decode a byteBuf into an object using a given version.
*
Expand All @@ -57,7 +57,7 @@ void validate(ByteBuf byteBuf) {
* the schema version to decode the object. null indicates using latest version.
* @return the deserialized object
*/
T decode(ByteBuf byteBuf, byte[] schemaVersion) {
public T decode(ByteBuf byteBuf, byte[] schemaVersion) {
// ignore version by default (most of the primitive schema implementations ignore schema version)
return decode(byteBuf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ public T decode(byte[] bytes) {
@Override
public T decode(byte[] bytes, byte[] schemaVersion) {
try {
return readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
return schemaVersion == null ? decode(bytes) :
readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(bytes);
} catch (ExecutionException | AvroTypeException e) {
if (e instanceof AvroTypeException) {
throw new SchemaSerializationException(e);
Expand All @@ -122,7 +123,8 @@ public T decode(ByteBuf byteBuf) {
@Override
public T decode(ByteBuf byteBuf, byte[] schemaVersion) {
try {
return readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(new ByteBufInputStream(byteBuf));
return schemaVersion == null ? decode(byteBuf) :
readerCache.get(BytesSchemaVersion.of(schemaVersion)).read(new ByteBufInputStream(byteBuf));
} catch (ExecutionException e) {
LOG.error("Can't get generic schema for topic {} schema version {}",
schemaInfoProvider.getTopicName(), Hex.encodeHexString(schemaVersion), e);
Expand Down Expand Up @@ -180,7 +182,7 @@ protected static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
return parser.parse(schemaJson);
}

protected static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
return SchemaInfo.builder()
.schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
.properties(schemaDefinition.getProperties())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ public interface RawMessage {
*/
Optional<String> getKey();

/**
* Get the schema verison of the message.
*
* @return the schema version of the message
*/
byte[] getSchemaVersion();

/**
* Get byteBuf of the key.
*
Expand All @@ -115,5 +122,4 @@ public interface RawMessage {
* @return true if the key is base64 encoded, false otherwise
*/
boolean hasBase64EncodedKey();

}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,14 @@ public Optional<String> getKey() {
}

@Override
public byte[] getSchemaVersion() {
if (msgMetadata != null && msgMetadata.get().hasSchemaVersion()) {
return msgMetadata.get().getSchemaVersion().toByteArray();
} else {
return null;
}
}

public Optional<ByteBuf> getKeyBytes() {
if (getKey().isPresent()) {
if (hasBase64EncodedKey()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;
package org.apache.pulsar.common.schema;

import com.google.common.base.MoreObjects;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;

/**
* Long schema version.
*/
public class LongSchemaVersion implements SchemaVersion {
private final long version;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Implementation of the common of the pulsar schema.
*/
package org.apache.pulsar.common.schema;
Original file line number Diff line number Diff line change
Expand Up @@ -18,92 +18,87 @@
*/
package org.apache.pulsar.sql.presto;

import com.google.common.annotations.VisibleForTesting;

import io.airlift.log.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;

import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;

import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;


/**
* Schema handler for payload in the Avro format.
*/
public class AvroSchemaHandler implements SchemaHandler {

private final DatumReader<GenericRecord> datumReader;

private final List<PulsarColumnHandle> columnHandles;

private static final FastThreadLocal<BinaryDecoder> decoders =
new FastThreadLocal<>();
private final GenericAvroSchema genericAvroSchema;

private final SchemaInfo schemaInfo;

private static final Logger log = Logger.get(AvroSchemaHandler.class);

public AvroSchemaHandler(Schema schema, List<PulsarColumnHandle> columnHandles) {
this.datumReader = new GenericDatumReader<>(schema);
AvroSchemaHandler(TopicName topicName,
PulsarConnectorConfig pulsarConnectorConfig,
SchemaInfo schemaInfo,
List<PulsarColumnHandle> columnHandles) throws PulsarClientException {
this(new PulsarSqlSchemaInfoProvider(topicName,
pulsarConnectorConfig.getPulsarAdmin()), schemaInfo, columnHandles);
}

AvroSchemaHandler(PulsarSqlSchemaInfoProvider pulsarSqlSchemaInfoProvider,
SchemaInfo schemaInfo, List<PulsarColumnHandle> columnHandles) {
this.schemaInfo = schemaInfo;
this.genericAvroSchema = new GenericAvroSchema(schemaInfo);
this.genericAvroSchema.setSchemaInfoProvider(pulsarSqlSchemaInfoProvider);
this.columnHandles = columnHandles;
}

@Override
public Object deserialize(ByteBuf payload) {

ByteBuf heapBuffer = null;
try {
BinaryDecoder decoderFromCache = decoders.get();

// Make a copy into a heap buffer, since Avro cannot deserialize directly from direct memory
int size = payload.readableBytes();
heapBuffer = ByteBufAllocator.DEFAULT.heapBuffer(size, size);
heapBuffer.writeBytes(payload);

BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(heapBuffer.array(), heapBuffer.arrayOffset(),
heapBuffer.readableBytes(), decoderFromCache);
if (decoderFromCache == null) {
decoders.set(decoder);
}
return this.datumReader.read(null, decoder);
} catch (IOException e) {
log.error(e);
} finally {
ReferenceCountUtil.safeRelease(heapBuffer);
}
return null;
return genericAvroSchema.decode(payload);
}

@Override
public Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload) {
return null;
public Object deserialize(ByteBuf payload, byte[] schemaVersion) {
return genericAvroSchema.decode(payload, schemaVersion);
}

@Override
public Object extractField(int index, Object currentRecord) {
try {
GenericRecord record = (GenericRecord) currentRecord;
GenericAvroRecord record = (GenericAvroRecord) currentRecord;
PulsarColumnHandle pulsarColumnHandle = this.columnHandles.get(index);
Integer[] positionIndices = pulsarColumnHandle.getPositionIndices();
Object curr = record.get(positionIndices[0]);
if (curr == null) {
return null;
}
if (positionIndices.length > 0) {
for (int i = 1; i < positionIndices.length; i++) {
curr = ((GenericRecord) curr).get(positionIndices[i]);
if (curr == null) {
return null;
}
String[] names = pulsarColumnHandle.getFieldNames();

if (names.length == 1) {
return record.getField(pulsarColumnHandle.getFieldNames()[0]);
} else {
for (int i = 0; i < names.length - 1; i++) {
record = (GenericAvroRecord) record.getField(names[i]);
}
return record.getField(names[names.length - 1]);
}
return curr;
} catch (Exception ex) {
log.debug(ex, "%s", ex);
}
return null;
}

@VisibleForTesting
GenericAvroSchema getSchema() {
return this.genericAvroSchema;
}

@VisibleForTesting
SchemaInfo getSchemaInfo() {
return schemaInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,6 @@ public Object deserialize(ByteBuf payload) {
}
}

@Override
public Object deserialize(ByteBuf keyPayload, ByteBuf dataPayload) {
return null;
}

@Override
public Object extractField(int index, Object currentRecord) {
try {
Expand Down
Loading

0 comments on commit 097108a

Please sign in to comment.