Skip to content

Commit

Permalink
Add Joda time logical type conversion. (apache#6704)
Browse files Browse the repository at this point in the history
### Motivation

After upgrade to Apache Avro 1.9.x, the default time conversion changed to JSR-310. For forwarding compatibility, we'd better add the Joda time conversion.

related to apache#5938 

### Modifications

Add joda time conversions

### Verifying this change

New integration test added
  • Loading branch information
codelipenghui authored Apr 11, 2020
1 parent 364876b commit 854716f
Show file tree
Hide file tree
Showing 20 changed files with 313 additions and 75 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci-integration-process.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ jobs:
- name: clean disk
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo swapoff /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci-integration-thread.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ jobs:
- name: clean disk
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo swapoff /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
Expand Down
17 changes: 8 additions & 9 deletions .github/workflows/ci-unit-broker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,20 @@ jobs:
with:
java-version: 1.8

- name: Set up Maven
uses: apache/pulsar-test-infra/setup-maven@master
if: steps.docs.outputs.changed_only == 'no'
with:
maven-version: 3.6.1

- name: clean disk
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff -a
sudo rm -f /swapfile
sudo swapoff /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
free -h
- name: Set up Maven
uses: apache/pulsar-test-infra/setup-maven@master
if: steps.docs.outputs.changed_only == 'no'
with:
maven-version: 3.6.1
- name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/ci-unit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ jobs:
with:
maven-version: 3.6.1

- name: clean disk
if: steps.docs.outputs.changed_only == 'no'
run: |
sudo swapoff /swapfile
sudo rm -rf /swapfile /usr/share/dotnet /usr/local/lib/android /opt/ghc
sudo apt clean
docker rmi $(docker images -q) -f
df -h
- name: run unit tests install by skip tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DskipTests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ static <T> SchemaDefinitionBuilder<T> builder() {
*/
boolean getAlwaysAllowNull();

/**
* Get JSR310 conversion enabled.
*
* @return return true if enable JSR310 conversion. false means use Joda time conversion.
*/
boolean isJsr310ConversionEnabled();

/**
* Get schema class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ public interface SchemaDefinitionBuilder<T> {
*/
SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull);

/**
* Set schema use JRS310 conversion or not.
*
* <p>Before Avro 1.9 the Joda time library was used for handling the logical date(time) values.
* But since the introduction of Java8 the Java Specification Request (JSR) 310 has been included,
* which greatly improves the handling of date and time natively. To keep forwarding compatibility,
* default is use Joda time conversion.
*
* <p>JSR310 conversion is recommended here. Joda time conversion is has been marked deprecated.
* In future versions, joda time conversion may be removed
*
* @param jsr310ConversionEnabled use JRS310 conversion or not, default is false for keep forwarding compatibility
* @return schema definition builder
*/
SchemaDefinitionBuilder<T> withJSR310ConversionEnabled(boolean jsr310ConversionEnabled);

/**
* Set schema info properties.
*
Expand Down
1 change: 0 additions & 1 deletion pulsar-client-messagecrypto-bc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,5 @@
<artifactId>bouncy-castle-bc-shaded</artifactId>
<version>${project.parent.version}</version>
</dependency>

</dependencies>
</project>
6 changes: 6 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Conversions;
import org.apache.avro.data.JodaTimeConversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -42,36 +43,14 @@
public class AvroSchema<T> extends StructSchema<T> {
private static final Logger LOG = LoggerFactory.getLogger(AvroSchema.class);

// the aim to fix avro's bug
// https://issues.apache.org/jira/browse/AVRO-1891 bug address explain
// fix the avro logical type read and write
static {
ReflectData reflectDataAllowNull = ReflectData.AllowNull.get();

reflectDataAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectDataAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());

ReflectData reflectDataNotAllowNull = ReflectData.get();

reflectDataNotAllowNull.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectDataNotAllowNull.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
}

private ClassLoader pojoClassLoader;

private AvroSchema(SchemaInfo schemaInfo, ClassLoader pojoClassLoader) {
super(schemaInfo);
this.pojoClassLoader = pojoClassLoader;
setReader(new AvroReader<>(schema, pojoClassLoader));
setWriter(new AvroWriter<>(schema));
boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
setReader(new AvroReader<>(schema, pojoClassLoader, jsr310ConversionEnabled));
setWriter(new AvroWriter<>(schema, jsr310ConversionEnabled));
}

@Override
Expand Down Expand Up @@ -116,7 +95,8 @@ protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
log.info("Load schema reader for version({}), schema is : {}, schemaInfo: {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
schemaInfo.getSchemaDefinition(), schemaInfo.toString());
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader);
boolean jsr310ConversionEnabled = getJsr310ConversionEnabledFromSchemaInfo(schemaInfo);
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema, pojoClassLoader, jsr310ConversionEnabled);
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
Expand All @@ -125,4 +105,30 @@ protected SchemaReader<T> loadReader(BytesSchemaVersion schemaVersion) {
}
}

private static boolean getJsr310ConversionEnabledFromSchemaInfo(SchemaInfo schemaInfo) {
if (schemaInfo != null) {
return Boolean.parseBoolean(schemaInfo.getProperties()
.getOrDefault(SchemaDefinitionBuilderImpl.JSR310_CONVERSION_ENABLED, "false"));
}
return false;
}

public static void addLogicalTypeConversions(ReflectData reflectData, boolean jsr310ConversionEnabled) {
reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
if (jsr310ConversionEnabled) {
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
} else {
try {
Class.forName("org.joda.time.DateTime");
reflectData.addLogicalTypeConversion(new JodaTimeConversions.TimestampConversion());
} catch (ClassNotFoundException e) {
// Skip if have not provide joda-time dependency.
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T> {

public static final String ALWAYS_ALLOW_NULL = "__alwaysAllowNull";
public static final String JSR310_CONVERSION_ENABLED = "__jsr310ConversionEnabled";

/**
* the schema definition class
*/
private Class<T> clazz;

/**
* The flag of schema type always allow null
*
Expand All @@ -48,6 +50,13 @@ public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T
*/
private boolean alwaysAllowNull = true;

/**
* The flag of use JSR310 conversion or Joda time conversion.
*
* If value is true, use JSR310 conversion in the Avro schema. Otherwise, use Joda time conversion.
*/
private boolean jsr310ConversionEnabled = false;

/**
* The schema info properties
*/
Expand All @@ -69,6 +78,12 @@ public SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull) {
return this;
}

@Override
public SchemaDefinitionBuilder<T> withJSR310ConversionEnabled(boolean jsr310ConversionEnabled) {
this.jsr310ConversionEnabled = jsr310ConversionEnabled;
return this;
}

@Override
public SchemaDefinitionBuilder<T> addProperty(String key, String value) {
this.properties.put(key, value);
Expand Down Expand Up @@ -107,8 +122,10 @@ public SchemaDefinition<T> build() {
checkArgument(!(StringUtils.isNotBlank(jsonDef) && clazz != null),
"Not allowed to set pojo and jsonDef both for the schema definition.");

properties.put(ALWAYS_ALLOW_NULL, this.alwaysAllowNull ? "true" : "false");
return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning);
properties.put(ALWAYS_ALLOW_NULL, String.valueOf(this.alwaysAllowNull));
properties.put(JSR310_CONVERSION_ENABLED, String.valueOf(this.jsr310ConversionEnabled));
return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties, supportSchemaVersioning,
jsr310ConversionEnabled);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,24 @@ public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
* false you can define the field by yourself by the annotation@Nullable
*
*/
private boolean alwaysAllowNull;
private final boolean alwaysAllowNull;

private Map<String, String> properties;
private final Map<String, String> properties;

private String jsonDef;
private final String jsonDef;

private boolean supportSchemaVersioning;
private final boolean supportSchemaVersioning;

public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties, boolean supportSchemaVersioning) {
private final boolean jsr310ConversionEnabled;

public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties,
boolean supportSchemaVersioning, boolean jsr310ConversionEnabled) {
this.alwaysAllowNull = alwaysAllowNull;
this.properties = properties;
this.jsonDef = jsonDef;
this.pojo = pojo;
this.supportSchemaVersioning = supportSchemaVersioning;
this.jsr310ConversionEnabled = jsr310ConversionEnabled;
}
/**
* get schema whether always allow null or not
Expand All @@ -66,6 +70,11 @@ public boolean getAlwaysAllowNull() {
return alwaysAllowNull;
}

@Override
public boolean isJsr310ConversionEnabled() {
return jsr310ConversionEnabled;
}

/**
* Get json schema definition
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
*/
package org.apache.pulsar.client.impl.schema.reader;

import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaReader;

import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,30 +43,21 @@ public AvroReader(Schema schema) {
this.reader = new ReflectDatumReader<>(schema);
}

public AvroReader(Schema schema, ClassLoader classLoader) {
public AvroReader(Schema schema, ClassLoader classLoader, boolean jsr310ConversionEnabled) {
if (classLoader != null) {
ReflectData reflectData = new ReflectData(classLoader);
reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled);
this.reader = new ReflectDatumReader<>(schema, schema, reflectData);
} else {
this.reader = new ReflectDatumReader<>(schema);
}
}

public AvroReader(Schema writerSchema, Schema readerSchema, ClassLoader classLoader) {
public AvroReader(Schema writerSchema, Schema readerSchema, ClassLoader classLoader,
boolean jsr310ConversionEnabled) {
if (classLoader != null) {
ReflectData reflectData = new ReflectData(classLoader);
reflectData.addLogicalTypeConversion(new Conversions.DecimalConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.DateConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
reflectData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled);
this.reader = new ReflectDatumReader<>(writerSchema, readerSchema, reflectData);
} else {
this.reader = new ReflectDatumReader<>(writerSchema, readerSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.AvroSchema;

import java.io.ByteArrayOutputStream;

Expand All @@ -33,9 +35,15 @@ public class AvroWriter<T> implements SchemaWriter<T> {
private ByteArrayOutputStream byteArrayOutputStream;

public AvroWriter(Schema schema) {
this(schema, false);
}

public AvroWriter(Schema schema, boolean jsr310ConversionEnabled) {
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
this.writer = new ReflectDatumWriter<>(schema);
ReflectData reflectData = new ReflectData();
AvroSchema.addLogicalTypeConversions(reflectData, jsr310ConversionEnabled);
this.writer = new ReflectDatumWriter<>(schema, reflectData);
}

@Override
Expand Down
Loading

0 comments on commit 854716f

Please sign in to comment.