Skip to content

Commit

Permalink
Upgrade Avro to 1.9.1 (#5938)
Browse files Browse the repository at this point in the history
### Motivation

Currently, Pulsar uses Avro 1.8.2, a version released two years ago. The latest version of Avro is 1.9.1, which uses FasterXML's Jackson 2.x instead of Codehaus's Jackson 1.x. Jackson is prone to security issues, so we should not keep using older versions.
https://blog.godatadriven.com/apache-avro-1-9-release

### Modifications

Avro 1.9 has some major changes:

- The library used to handle logical datetime values has changed from Joda-Time to JSR-310 (apache/avro#631)
- Namespaces no longer include "$" when generating schemas containing inner classes using ReflectData (apache/avro#283)
- Validation of default values has been enabled (apache/avro#288). This results in a validation error when parsing the following schema:
```json
{
  "name": "fieldName",
  "type": [
    "null",
    "string"
  ],
  "default": "defaultValue"
}
```
The default value of a nullable field must be null (cf. https://issues.apache.org/jira/browse/AVRO-1803), and the default value of the field as above is actually null. However, this PR disables the validation in order to maintain the traditional behavior.
  • Loading branch information
Masahiro Sakamoto authored and sijie committed Jan 6, 2020
1 parent 9d94860 commit d6f240e
Show file tree
Hide file tree
Showing 27 changed files with 116 additions and 119 deletions.
12 changes: 2 additions & 10 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,6 @@ The Apache Software License, Version 2.0
* JCommander -- com.beust-jcommander-1.48.jar
* High Performance Primitive Collections for Java -- com.carrotsearch-hppc-0.7.3.jar
* Jackson
- org.codehaus.jackson-jackson-core-asl-1.9.13.jar
- org.codehaus.jackson-jackson-mapper-asl-1.9.13.jar
- com.fasterxml.jackson.core-jackson-annotations-2.10.1.jar
- com.fasterxml.jackson.core-jackson-core-2.10.1.jar
- com.fasterxml.jackson.core-jackson-databind-2.10.1.jar
Expand Down Expand Up @@ -454,13 +452,11 @@ The Apache Software License, Version 2.0
* OpenCensus
- io.opencensus-opencensus-api-0.18.0.jar
- io.opencensus-opencensus-contrib-grpc-metrics-0.18.0.jar
* Paranamer
- com.thoughtworks.paranamer-paranamer-2.7.jar
* Jodah
- net.jodah-typetools-0.5.0.jar
* Apache Avro
- org.apache.avro-avro-1.8.2.jar
- org.apache.avro-avro-protobuf-1.8.2.jar
- org.apache.avro-avro-1.9.1.jar
- org.apache.avro-avro-protobuf-1.9.1.jar
* Apache Curator
- org.apache.curator-curator-client-4.0.1.jar
- org.apache.curator-curator-framework-4.0.1.jar
Expand Down Expand Up @@ -568,10 +564,6 @@ Eclipse Public License 1.0 -- licenses/LICENSE-AspectJ.txt
- org.aspectj-aspectjrt-1.9.2.jar
- org.aspectj-aspectjweaver-1.9.2.jar

Public Domain
* XZ for Java -- licenses/LICENSE-xz.txt
- org.tukaani-xz-1.5.jar

Public Domain (CC0) -- licenses/LICENSE-CC0.txt
* Reactive Streams -- org.reactivestreams-reactive-streams-1.0.2.jar

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ flexible messaging model and an intuitive client API.</description>
<kafka-client.version>2.3.0</kafka-client.version>
<rabbitmq-client.version>5.1.1</rabbitmq-client.version>
<aws-sdk.version>1.11.297</aws-sdk.version>
<avro.version>1.8.2</avro.version>
<avro.version>1.9.1</avro.version>
<joda.version>2.10.1</joda.version>
<jclouds.version>2.1.1</jclouds.version>
<sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ public void checkCompatible(Iterable<SchemaData> from, SchemaData to, SchemaComp
try {
for (SchemaData schemaData : from) {
Schema.Parser parser = new Schema.Parser();
parser.setValidateDefaults(false);
fromList.addFirst(parser.parse(new String(schemaData.getData(), UTF_8)));
}
Schema.Parser parser = new Schema.Parser();
parser.setValidateDefaults(false);
Schema toSchema = parser.parse(new String(to.getData(), UTF_8));
SchemaValidator schemaValidator = createSchemaValidator(strategy);
schemaValidator.validate(toSchema, fromList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ private boolean isAvroSchema(SchemaData schemaData) {
try {

Schema.Parser fromParser = new Schema.Parser();
fromParser.setValidateDefaults(false);
Schema fromSchema = fromParser.parse(new String(schemaData.getData(), UTF_8));
return true;
} catch (SchemaParseException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void validate(SchemaData schemaData) throws InvalidSchemaDataException {

try {
Schema.Parser avroSchemaParser = new Schema.Parser();
avroSchemaParser.setValidateDefaults(false);
avroSchemaParser.parse(new String(data, UTF_8));
} catch (SchemaParseException e) {
if (schemaData.getType() == SchemaType.JSON) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,49 +33,49 @@ public abstract class BaseAvroSchemaCompatibilityTest {

private static final String schemaJson1 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
private static final SchemaData schemaData1 = getSchemaData(schemaJson1);

private static final String schemaJson2 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
private static final SchemaData schemaData2 = getSchemaData(schemaJson2);

private static final String schemaJson3 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org" +
".apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheckTest$\"," +
".apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheckTest\"," +
"\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"string\"}]}";
private static final SchemaData schemaData3 = getSchemaData(schemaJson3);

private static final String schemaJson4 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1_v2\",\"type\":\"string\"," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1_v2\",\"type\":\"string\"," +
"\"aliases\":[\"field1\"]}]}";
private static final SchemaData schemaData4 = getSchemaData(schemaJson4);

private static final String schemaJson5 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
"\"string\"]}]}";
private static final SchemaData schemaData5 = getSchemaData(schemaJson5);

private static final String schemaJson6 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
"\"string\",\"int\"]}]}";
private static final SchemaData schemaData6 = getSchemaData(schemaJson6);

private static final String schemaJson7 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"},{\"name\":\"field3\"," +
"\"type\":\"string\",\"default\":\"bar\"}]}";
private static final SchemaData schemaData7 = getSchemaData(schemaJson7);

private static final String schemaJson8 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\"}]}";
private static final SchemaData schemaData8 = getSchemaData(schemaJson8);

Expand All @@ -96,10 +96,10 @@ public void testBackwardCompatibility() {
Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData3,
SchemaCompatibilityStrategy.BACKWARD),
"adding a field without default is NOT backwards compatible");
// Modifying a field name is not backwards compatible
Assert.assertFalse(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData4,
SchemaCompatibilityStrategy.BACKWARD),
"Modifying a field name is not backwards compatible");
// Modifying a field name with an alias is backwards compatible
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData4,
SchemaCompatibilityStrategy.BACKWARD),
"Modifying a field name with an alias is backwards compatible");
// evolving field to a union is backwards compatible
Assert.assertTrue(schemaCompatibilityCheck.isCompatible(schemaData1, schemaData5,
SchemaCompatibilityStrategy.BACKWARD),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,18 +293,18 @@ public void dontReAddExistingSchemaInMiddle() throws Exception {
public void checkIsCompatible() throws Exception {
String schemaJson1 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
SchemaData schemaData1 = getSchemaData(schemaJson1);

String schemaJson2 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
SchemaData schemaData2 = getSchemaData(schemaJson2);

String schemaJson3 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
".AvroSchemaCompatibilityCheckTest\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\"}]}";
SchemaData schemaData3 = getSchemaData(schemaJson3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.testng.Assert.assertEquals;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import io.netty.buffer.Unpooled;
Expand All @@ -44,8 +45,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import avro.shaded.com.google.common.collect.Lists;

public class MessageParserTest extends MockedPulsarServiceBaseTest {

@BeforeMethod
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-go/pulsar/schemaDef_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestSchemaDef(t *testing.T) {
_, err = initAvroCodec(errSchemaDef4)
assert.NotNil(t, err)

errSchemaDef5 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"operation.createJsonConsumer$\"," +
errSchemaDef5 := "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"operation.createJsonConsumer\"," +
"\"fields\":[{\"name\":\"ID\",\"type\":\"byte\"},{\"name\":\"Name\",\"type\":\":[\"null\",\"string\"],\"default\":null\"}]}"
_, err = initAvroCodec(errSchemaDef5)
assert.NotNil(t, err)
Expand Down
6 changes: 0 additions & 6 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,6 @@
<artifactId>jackson-module-jsonSchema</artifactId>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,38 +41,27 @@
public class AvroSchema<T> extends StructSchema<T> {
private static final Logger LOG = LoggerFactory.getLogger(AvroSchema.class);

// the aim to fix avro's bug
// https://issues.apache.org/jira/browse/AVRO-1891 bug address explain
// fix the avro logical type read and write
// the aim to fix avro's bug
// https://issues.apache.org/jira/browse/AVRO-1891 bug address explain
// fix the avro logical type read and write
static {
try {
ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();
ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();

reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());

ReflectData reflectDataNotAllowNull = ReflectData.get();
ReflectData reflectDataNotAllowNull = ReflectData.get();

reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimeMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.LossyTimestampMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeConversion());
} catch (Throwable t) {
if (LOG.isDebugEnabled()) {
LOG.debug("Avro logical types are not available. If you are going to use avro logical types, " +
"you can include `joda-time` in your dependency.");
}
}
reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
}

private AvroSchema(SchemaInfo schemaInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;

import java.lang.reflect.Field;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

Expand All @@ -29,6 +30,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import org.apache.avro.reflect.ReflectData;
import org.apache.commons.codec.binary.Hex;
Expand Down Expand Up @@ -133,14 +135,34 @@ protected static org.apache.avro.Schema createAvroSchema(SchemaDefinition schema
if (StringUtils.isNotBlank(schemaDefinition.getJsonDef())) {
return parseAvroSchema(schemaDefinition.getJsonDef());
} else if (pojo != null) {
return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo) : ReflectData.get().getSchema(pojo);
ThreadLocal<Boolean> validateDefaults = null;

try {
Field validateDefaultsField = Schema.class.getDeclaredField("VALIDATE_DEFAULTS");
validateDefaultsField.setAccessible(true);
validateDefaults = (ThreadLocal<Boolean>) validateDefaultsField.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Cannot disable validation of default values", e);
}

final boolean savedValidateDefaults = validateDefaults.get();

try {
// Disable validation of default values for compatibility
validateDefaults.set(false);
return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo)
: ReflectData.get().getSchema(pojo);
} finally {
validateDefaults.set(savedValidateDefaults);
}
} else {
throw new RuntimeException("Schema definition must specify pojo class or schema json definition");
}
}

protected static org.apache.avro.Schema parseAvroSchema(String schemaJson) {
final Parser parser = new Parser();
parser.setValidateDefaults(false);
return parser.parse(schemaJson);
}

Expand Down
Loading

0 comments on commit d6f240e

Please sign in to comment.