Skip to content

Conversation

@chenjunjiedada
Copy link
Collaborator

This is sub PR of #1237, it changes the current FlinkParquetReader to use a schema visitor.

@chenjunjiedada
Copy link
Collaborator Author

@openinx , Would you mind to take another look on this?

}

@Override
public ParquetValueReader<?> struct(Types.StructType ignored, GroupType struct,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here seems we could return the determinate parameter type RowData. Can change it to ParquetValueReader<RowData>.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Fixed.


private static class FallbackReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
private MessageType type;
private final TypeWithSchemaVisitor<ParquetValueReader<?>> builder;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Is there any problem here ? The FallbackReadBuilder don't need to implement those methods ?

  1. public T list(Types.ListType iList, GroupType array, T element) ;
  2. public T map(Types.MapType iMap, GroupType map, T key, T value) ;
  3. public T primitive(org.apache.iceberg.types.Type.PrimitiveType iPrimitive, PrimitiveType primitive)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is. I forgot that it doesn't extend the ReaderBuilder now but its parent class. This is a bit complex than directly extend the ReaderBuilder and override necessary methods. @rdblue, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this refactor should be done in a separate commit. Let's not over-complicated this one. We can judge the value of the change better when it is isolated to its own PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, sound good to me.

}
}

private static class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Is there neccessray to abstract a common ReadBuilder and make the GenericParquetReader , SparkParquetReader, FlinkParquetReader to share it ? I mean we could define different Reader(s) for different engine data type, for example we may have Flink's StructReader , and Spark's StructReader, but the implementation of TypeWithSchemaVisitor could be shared. Does it make sense ? CC @rdblue

I raise this issue because I saw almost all the codes in ReadBuilder are the same, different copies may need extra resources (both contributor and reviewers) to maintain them.

Copy link
Collaborator Author

@chenjunjiedada chenjunjiedada Jul 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to refactor this. The struct/list/map methods are mostly the same, only primitive methods are different from each other due to differences in data models. This should also apply to Avro and Orc ReadBuilder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we should refactor this but I'd prefer to do it separately. I'm okay continuing with the current approach in this PR and refactoring later or in parallel.

Copy link
Collaborator Author

@chenjunjiedada chenjunjiedada Aug 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound good to me, we can take a look at WriteBuilder as well after all these readers and writers get in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to create an issue to address this thing, once the readers and writers get in, we could do the refactor in that issue .

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #1294.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chenjunjiedada , I have no idea about Avro, it is simple enough, looks like most of codes are related to Flink data structures. It may not be worth refactoring.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @JingsongLi for your input, will take a look again when we start that task.

@Override
public TimestampData read(TimestampData ignored) {
long value = readLong();
return TimestampData.fromInstant(Instant.ofEpochSecond(value / 1000_000, (value % 1000_000) * 1000));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is timestamp with zone or without zone ? If it's timestamp with zone, then the value could be negative, then we could not just use / and '%', instead we should use Math.floorDiv and Math.floorMod. Please see this pull request: #1271 . ( for Java, -5/2=-2, while Math.floorDiv(-5, 2)=-3, actually we need the -3 when considering the epoch second).

Spark's parquet TimestampMillisReader don't have this issue because it does not depend any div or mod operation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. Updated this accordingly.

@Override
public Integer read(Integer reuse) {
// Flink only supports millisecond, so we discard microseconds in millisecond
return (int) column.nextLong() / 1000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like Iceberg doesn't have time zone attribute for TimeType. Do we need to handle timezone logic, here?

I updated this to use Math.floorDiv anyway.

}

@Override
@SuppressWarnings("unchecked")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it could be removed now ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

this.values = new Object[20];
} else {
Object[] old = values;
this.values = new Object[old.length << 2];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right ? should be old.length << 1 ? CC @rdblue , spark's ReusableArrayData have the same issue ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we don't need to grow four times space, actually double array space is enough.

@chenjunjiedada
Copy link
Collaborator Author

@openinx, Thanks for your comments. I just addressed them, would you please help to take another if you have time?

@chenjunjiedada
Copy link
Collaborator Author

@openinx @rdblue, would you please help to take another look? Just removed the FallbackReaderBuilder and renamed the confusing variable.

}
}

private static class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to create an issue to address this thing, once the readers and writers get in, we could do the refactor in that issue .


@Override
public ParquetValueReader<RowData> message(Types.StructType expected, MessageType message,
List<ParquetValueReader<?>> fieldReaders) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could code format it, seems we missing the indent.


@Override
public ParquetValueReader<RowData> struct(Types.StructType expected, GroupType struct,
List<ParquetValueReader<?>> fieldReaders) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent ?

import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

public class ParquetWithFlinkSchemaVisitor<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this PR ( flink parquet reader) , seems we don't need this writer visitor .

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let me remove this.

this.values = new Object[20];
} else {
Object[] old = values;
this.values = new Object[old.length << 2];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we don't need to grow four times space, actually double array space is enough.

}


public static Iterable<Record> generateRecords(Schema schema, int numRecords, long seed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


public static Iterable<Row> generateFallbackData(Schema schema, int numRecords, long seed, long numDictRows) {
return generateData(schema, numRecords, () -> new FallbackGenerator(seed, numDictRows));
public static Iterable<Record> generateFallbackRecords(Schema schema, int numRecords, long seed, long numDictRows) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.


public static Iterable<Row> generateDictionaryEncodableData(Schema schema, int numRecords, long seed) {
return generateData(schema, numRecords, () -> new DictionaryEncodedGenerator(seed));
public static Iterable<Record> generateDictionaryEncodableRecords(Schema schema, int numRecords, long seed) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import static org.apache.iceberg.flink.data.RandomData.COMPLEX_SCHEMA;

public class TestFlinkParquetReaderWriter {
public class TestFlinkParquetReader {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to extend the org.apache.iceberg.data.DataTest because it provides more test cases, which COMPLEX_SCHEMA did not cover. ( I used COMPLEX_SCHEMA before, because I don't know there's a better testing method).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me and updated.

Comment on lines 68 to 118
case BOOLEAN:
Assert.assertEquals("boolean value should be equal", value, actual.getBoolean(i));
break;
case INTEGER:
Assert.assertEquals("int value should be equal", value, actual.getInt(i));
break;
case LONG:
Assert.assertEquals("long value should be equal", value, actual.getLong(i));
break;
case FLOAT:
Assert.assertEquals("float value should be equal", value, actual.getFloat(i));
break;
case DOUBLE:
Assert.assertEquals("double should be equal", value, actual.getDouble(i));
break;
case STRING:
Assert.assertTrue("Should expect a CharSequence", value instanceof CharSequence);
Assert.assertEquals("string should be equal", String.valueOf(value), actual.getString(i).toString());
break;
case DATE:
Assert.assertTrue("Should expect a Date", value instanceof LocalDate);
LocalDate date = ChronoUnit.DAYS.addTo(EPOCH_DAY, actual.getInt(i));
Assert.assertEquals("date should be equal", value, date);
break;
case TIME:
Assert.assertTrue("Should expect a LocalTime", value instanceof LocalTime);
int milliseconds = (int) (((LocalTime) value).toNanoOfDay() / 1000_000);
Assert.assertEquals("time millis should be equal", milliseconds, actual.getInt(i));
break;
case TIMESTAMP:
if (((Types.TimestampType) type.asPrimitiveType()).shouldAdjustToUTC()) {
Assert.assertTrue("Should expect a OffsetDataTime", value instanceof OffsetDateTime);
OffsetDateTime ts = (OffsetDateTime) value;
Assert.assertEquals("OffsetDataTime should be equal", ts.toLocalDateTime(),
actual.getTimestamp(i, 6).toLocalDateTime());
} else {
Assert.assertTrue("Should expect a LocalDataTime", value instanceof LocalDateTime);
LocalDateTime ts = (LocalDateTime) value;
Assert.assertEquals("LocalDataTime should be equal", ts,
actual.getTimestamp(i, 6).toLocalDateTime());
}
break;
case FIXED:
Assert.assertTrue("Should expect byte[]", value instanceof byte[]);
Assert.assertArrayEquals("binary should be equal", (byte[]) value, actual.getBinary(i));
break;
case BINARY:
Assert.assertTrue("Should expect a ByteBuffer", value instanceof ByteBuffer);
Assert.assertArrayEquals("binary should be equal", ((ByteBuffer) value).array(), actual.getBinary(i));
break;
case DECIMAL:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we share the same primitive value assertion between assertRowData and assertArrayValues, could it be a common method ?

Copy link
Collaborator Author

@chenjunjiedada chenjunjiedada Aug 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to extract to one common method with an interface like assertData(Type type, Record expected, Object actual) so that we can determine actual via instanceof, but we still have to call the getter method with casting, such as ((RowData)actual).getInt(i) and ((ArrayData)actual).getInt(i), so the code still looks a bit duplicated as current. Does that make sense to you?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to reuse those big switch-case, How about the following way ? ( I did not finish all the code, but seems it should work to reuse that code).

public class TestHelpers {
  private TestHelpers() {
  }

  private static final OffsetDateTime EPOCH = Instant.ofEpochMilli(0L).atOffset(ZoneOffset.UTC);
  private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

  public static void assertRowData(Type type, RowType rowType, Record expectedRecord, RowData actualRowData) {
    if (expectedRecord == null && actualRowData == null) {
      return;
    }

    Assert.assertTrue("expected Record and actual RowData should be both null or not null",
        expectedRecord != null && actualRowData != null);

    List<Type> types = new ArrayList<>();
    for (Types.NestedField field : type.asStructType().fields()) {
      types.add(field.type());
    }

    for (int i = 0; i < types.size(); i += 1) {
      if (expectedRecord.get(i) == null) {
        Assert.assertTrue(actualRowData.isNullAt(i));
        continue;
      }
      Object expected = expectedRecord.get(i);
      LogicalType logicalType = rowType.getTypeAt(i);

      final int fieldPos = i;
      assertEquals(types.get(i), rowType.getTypeAt(i), expected,
          () -> RowData.createFieldGetter(logicalType, fieldPos).getFieldOrNull(actualRowData));
    }
  }

  private static void assertEquals(Type type, LogicalType logicalType, Object expected, Supplier<Object> supplier) {
    switch (type.typeId()) {
      case BOOLEAN:
        Assert.assertEquals("boolean value should be equal", expected, supplier.get());
        break;
      case INTEGER:
        Assert.assertEquals("int value should be equal", expected, supplier.get());
        break;
      case LONG:
        Assert.assertEquals("long value should be equal", expected, supplier.get());
        break;
      case FLOAT:
        Assert.assertEquals("float value should be equal", expected, supplier.get());
        break;
      case DOUBLE:
        Assert.assertEquals("double value should be equal", expected, supplier.get());
        break;
      case STRING:
        Assert.assertTrue("Should expect a CharSequence", expected instanceof CharSequence);
        Assert.assertEquals("string should be equal",
            String.valueOf(expected), supplier.get().toString());
        break;
      case DATE:
        Assert.assertTrue("Should expect a Date", expected instanceof LocalDate);
        LocalDate date = ChronoUnit.DAYS.addTo(EPOCH_DAY, (int) supplier.get());
        Assert.assertEquals("date should be equal", expected, date);
        break;
      case TIME:
        Assert.assertTrue("Should expect a LocalTime", expected instanceof LocalTime);
        int milliseconds = (int) (((LocalTime) expected).toNanoOfDay() / 1000_000);
        Assert.assertEquals("time millis should be equal", milliseconds, supplier.get());
        break;
      case TIMESTAMP:
        if (((Types.TimestampType) type.asPrimitiveType()).shouldAdjustToUTC()) {
          Assert.assertTrue("Should expect a OffsetDataTime", expected instanceof OffsetDateTime);
          OffsetDateTime ts = (OffsetDateTime) expected;
          Assert.assertEquals("OffsetDataTime should be equal", ts.toLocalDateTime(),
              ((TimestampData) supplier.get()).toLocalDateTime());
        } else {
          Assert.assertTrue("Should expect a LocalDataTime", expected instanceof LocalDateTime);
          LocalDateTime ts = (LocalDateTime) expected;
          Assert.assertEquals("LocalDataTime should be equal", ts,
              ((TimestampData) supplier.get()).toLocalDateTime());
        }
        break;
      case BINARY:
        Assert.assertTrue("Should expect a ByteBuffer", expected instanceof ByteBuffer);
        Assert.assertArrayEquals("binary should be equal", ((ByteBuffer) expected).array(), (byte[]) supplier.get());
        break;
      case DECIMAL:
        Assert.assertTrue("Should expect a BigDecimal", expected instanceof BigDecimal);
        BigDecimal bd = (BigDecimal) expected;
        Assert.assertEquals("decimal value should be equal", bd,
            ((DecimalData) supplier.get()).toBigDecimal());
        break;
      case LIST: // TODO call assertArrayValues.
      case MAP:
      case STRUCT:
      case UUID:
      case FIXED:
        // TODO;
      default:
        throw new IllegalArgumentException("Not a supported type: " + type);
    }
  }

  private static void assertArrayValues(Type type, LogicalType logicalType, Collection<?> expectedArray,
                                        ArrayData actualArray) {
    List<?> expectedElements = Lists.newArrayList(expectedArray);
    for (int i = 0; i < expectedArray.size(); i += 1) {
      if (expectedElements.get(i) == null) {
        Assert.assertTrue(actualArray.isNullAt(i));
        continue;
      }

      Object expected = expectedElements.get(i);

      final int pos = i;
      assertEquals(type, logicalType, expected,
          () -> ArrayData.createElementGetter(logicalType).getElementOrNull(actualArray, pos));
    }
  }
}


private static class RandomRecordGenerator extends RandomDataGenerator<Record> {
private RandomRecordGenerator(long seed) {
public static class RandomRecordGenerator extends RandomDataGenerator<Record> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@chenjunjiedada
Copy link
Collaborator Author

@rdblue @openinx , Would you please help to take another look? This is ready for another round.

}

public static Iterable<Record> generateFallbackRecords(Schema schema, int numRecords, long seed, long numDictRows) {
return generateRecords(schema, numRecords, new FallbackGenerator(seed, numDictRows));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here we'd better to create the record lazily, for Iterable<Record> result, (similar to spark RandomData), because if we wanna to generate lots of records, it will be not easy to OOM .

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

return generateRecords(schema, numRecords, new RandomRecordGenerator(seed));
}

public static List<Record> generateRecords(Schema schema, int numRecords,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually don't expose this method to public because RandomRecordGenerator is a private static class and others could not access this method actually.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

case INT_8:
case INT_16:
case INT_32:
case DATE:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DATE type should always be an integer ? For me, seems more reasonable to move the DATE case to the place where INT_64 is, because they are surely to use UnboxedReader .

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, updated.

return TimestampData.fromLocalDateTime(Instant.ofEpochSecond(Math.floorDiv(value, 1000_000),
Math.floorMod(value, 1000_000) * 1000)
.atOffset(ZoneOffset.UTC)
.toLocalDateTime());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct ? For my understanding, the timestamp with time zone don't need to convert to a LocalDateTime (because it has its own time zone), while the timestamp without time zone need to.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here is constructing the TimestampData from LocalDataTime, but not converting to LocalDataTime.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. But I think we could construct the TimestampData by a simply way like following:

      long micros = readLong();
      return TimestampData.fromEpochMillis(Math.floorDiv(micros, 1_000),
          (int) Math.floorMod(micros, 1_000) * 1_000);

BTW, we don't have two timestamp readers: TimestampTzReader and TimestampReader. Because for both cases, we just read the timestamp into a TimestampData and there's no difference. So we could only keep one timestamp reader, name it as TimestampMicrosReader.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the previous version, I used this way and change to this because I want to make it symmetrical. It looks not necessary now. Just changed back.

public DecimalData read(DecimalData ignored) {
Binary binary = column.nextBinary();
BigDecimal bigDecimal = new BigDecimal(new BigInteger(binary.getBytes()), scale);
return DecimalData.fromBigDecimal(bigDecimal, bigDecimal.precision(), scale);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we pass the bigDecimal.precision() to DecimalData.fromBigDecimal, I think it's incorrect. Because the precision expected for DecimalData is the precision of data type, it will use this value to do ROUND etc. You could see the comments in DecimalData:

	// The semantics of the fields are as follows:
	//  - `precision` and `scale` represent the precision and scale of SQL decimal type
	//  - If `decimalVal` is set, it represents the whole decimal value
	//  - Otherwise, the decimal value is longVal/(10^scale).
	//
	// Note that the (precision, scale) must be correct.
	// if precision > MAX_COMPACT_PRECISION,
	//   `decimalVal` represents the value. `longVal` is undefined
	// otherwise, (longVal, scale) represents the value
	//   `decimalVal` may be set and cached

	final int precision;
	final int scale;

I think we need to pass the decimal type's precision and scale, and the use the DecimalData.fromUnscaledBytes(binary.getBytes(), precision, scale); to construct the DecimalData.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, is it possible that we have few unit tests to address this ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx is right, it should be same to FlinkValueReaders.DecimalReader.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. I think we can add unit tests when write side is ready so that we can write RowData and read out to validate. Does that make sense to you?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, make sense. I think we may need a TODO comment here so that we won't forget that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

value = (E) list.values[readPos];
}

readPos += 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: when the case readPos >= list.capacity(), seems we don't need to increment the readPos ? For my understanding, in that case the getElement should return a null, means that we will read the value without a provided reuse element. Then we also shouldn't move the readPos because we actually don't reuse any object from the list.

Of course, if we increment readPos here, the logic is also correct. but seem have less possibility to reuse the object in array.

How do you think ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addElement will put new read element to the grown buffer, and that should not be reused. for example:

when capacity = 8, readPos = 8, list will grow the capacity to 16, and addElement will fill the list[8] with new read obejct, which should not be overwritten.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, you are right.

kv = entry;
}

readPos += 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question .

Assert.assertTrue("expected Record and actual RowData should be both null or not null",
expectedRecord != null && actualRowData != null);

List<Type> types = new ArrayList<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Lists.newArrayList.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost look good to me now, left few comments.

Assert.assertEquals("time millis should be equal", milliseconds, supplier.get());
break;
case TIMESTAMP:
if (((Types.TimestampType) type.asPrimitiveType()).shouldAdjustToUTC()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use ((Types.TimestampType) type).shouldAdjustToUTC().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Comment on lines 503 to 530
protected void setNull(GenericRowData row, int pos) {
row.setField(pos, null);
}

@Override
protected void setBoolean(GenericRowData row, int pos, boolean value) {
row.setField(pos, value);
}

@Override
protected void setInteger(GenericRowData row, int pos, int value) {
row.setField(pos, value);
}

@Override
protected void setLong(GenericRowData row, int pos, long value) {
row.setField(pos, value);
}

@Override
protected void setFloat(GenericRowData row, int pos, float value) {
row.setField(pos, value);
}

@Override
protected void setDouble(GenericRowData row, int pos, double value) {
row.setField(pos, value);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For flink GenericRowData, we don't provide setInteger/setLong/setFloat interfaces etc, so seems we don't have to override those methods from ParquetValueReaders.StructReader, because all of them have the default implementation, for example:

    protected void setBoolean(I struct, int pos, boolean value) {
      set(struct, pos, value);
    }

I mean override the set(I struct, int pos, Object value) is enough.

BTW, I saw Setter interface in ParquetValueReaders.StructReader don't have meaningful usage. Is it necessary to create a separate pull request to remove all methods related to Setter ? cc @rdblue .

@chenjunjiedada
Copy link
Collaborator Author

@openinx, Addressed your lastest comments. Thanks for your detailed review!

@chenjunjiedada
Copy link
Collaborator Author

@rdblue, would you please help to take anther look? I will rebase the writer side on this.

@rdblue
Copy link
Contributor

rdblue commented Aug 11, 2020

I should have time to review this tomorrow, or Wednesday at the latest. Thanks, @chenjunjiedada!

private static final OffsetDateTime EPOCH = Instant.ofEpochMilli(0L).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

public static void assertRowData(Type type, LogicalType rowType, Record expectedRecord, RowData actualRowData) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the objects to validate are Record and RowData, then type should be StructType.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}

private static void assertEquals(Type type, LogicalType logicalType, Object expected, Supplier<Object> supplier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why call supplier.get() in every case when you could pass an object instead of a supplier?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant to remove the supplier entirely. Why is it necessary to pass Supplier<Object> and not just Object to this method? As far as I can tell, get is called on the supplier immediately in all cases.

case INT_64:
return new ParquetValueReaders.UnboxedReader<>(desc);
case TIMESTAMP_MICROS:
return new TimestampMicrosReader(desc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this check whether the timestamp is a TIMESTAMP WITH ZONE or TIMESTAMP WITHOUT ZONE and return the correct reader? Spark doesn't do this because it doesn't support timestamp without zone, so we know it is always a timestamptz.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. @openinx, you might want to have a check here. I changed this back to two readers because the type in the imported parquet file could also have time zone info as well.

@rdblue
Copy link
Contributor

rdblue commented Aug 12, 2020

Thanks, @chenjunjiedada. I think the implementation and tests are correct, but I'd like to fix the data assertions before merging.

@chenjunjiedada
Copy link
Collaborator Author

@rdblue , I addressed all your comments. Please help to take another look at your convenience.

@rdblue
Copy link
Contributor

rdblue commented Aug 20, 2020

The only remaining issue that I see is that this uses Supplier when I don't think it needs to, but we can follow up. I'll merge this to unblock the next steps.

@rdblue rdblue merged commit 6cb2db7 into apache:master Aug 20, 2020
@chenjunjiedada chenjunjiedada deleted the use-schema-visitor-for-flink-reader branch August 20, 2020 03:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants