Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,6 @@ public static Schema getNestedFieldSchemaFromRecord(GenericRecord record, String
throw new HoodieException("Failed to get schema. Not a valid field name: " + fieldName);
}


/**
* Get schema for the given field and write schema. Field can be nested, denoted by dot notation. e.g: a.b.c
* Use this method when record is not available. Otherwise, prefer to use {@link #getNestedFieldSchemaFromRecord(GenericRecord, String)}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.common.testutils;

import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;

/**
* Generic class for specific payload implementations to inherit from.
*/
public abstract class GenericTestPayload {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename to AbstractJsonTestPayload? It's essentially for json data right?


protected static final transient ObjectMapper OBJECT_MAPPER = new ObjectMapper();
protected String partitionPath;
protected String rowKey;
protected byte[] jsonDataCompressed;
protected int dataSize;
protected boolean isDeleted;
protected Comparable orderingVal;

public GenericTestPayload(Option<String> jsonData, String rowKey, String partitionPath, String schemaStr,
Boolean isDeleted, Comparable orderingVal) throws IOException {
if (jsonData.isPresent()) {
this.jsonDataCompressed = compressData(jsonData.get());
this.dataSize = jsonData.get().length();
}
this.rowKey = rowKey;
this.partitionPath = partitionPath;
this.isDeleted = isDeleted;
this.orderingVal = orderingVal;
}

public GenericTestPayload(String jsonData, String rowKey, String partitionPath, String schemaStr) throws IOException {
this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false, 0L);
}

public GenericTestPayload(String jsonData) throws IOException {
this.jsonDataCompressed = compressData(jsonData);
this.dataSize = jsonData.length();
Map<String, Object> jsonRecordMap = OBJECT_MAPPER.readValue(jsonData, Map.class);
this.rowKey = jsonRecordMap.get("_row_key").toString();
this.partitionPath = jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i recall this logic has been refactored in the current RawTripTestPayload

this.isDeleted = false;
}

public String getPartitionPath() {
return partitionPath;
}

public IndexedRecord getRecordToInsert(Schema schema) throws IOException {
MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
return jsonConverter.convert(getJsonData(), schema);
}

public String getRowKey() {
return rowKey;
}

public String getJsonData() throws IOException {
return unCompressData(jsonDataCompressed);
}

private byte[] compressData(String jsonData) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DeflaterOutputStream dos = new DeflaterOutputStream(baos, new Deflater(Deflater.BEST_COMPRESSION), true);
try {
dos.write(jsonData.getBytes());
} finally {
dos.flush();
dos.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this close the ByteArrayOutputStream too?

}
return baos.toByteArray();
}

private String unCompressData(byte[] data) throws IOException {
try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data))) {
return FileIOUtils.readAsUTFString(iis, dataSize);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand Down Expand Up @@ -133,6 +134,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
+ "{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+ "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";

public static final String S3_EVENTS_SCHEMA = S3EventsSchemaUtils.generateSchemaString();
public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
public static final String TRIP_HIVE_COLUMN_TYPES = "bigint,string,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6),"
+ "map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
Expand Down Expand Up @@ -216,13 +218,15 @@ public int getEstimatedFileSizeInBytes(int numOfRecords) {
return numOfRecords * BYTES_PER_RECORD + BLOOM_FILTER_BYTES;
}

public RawTripTestPayload generateRandomValueAsPerSchema(String schemaStr, HoodieKey key, String commitTime, boolean isFlattened) throws IOException {
public HoodieRecordPayload generateRandomValueAsPerSchema(String schemaStr, HoodieKey key, String commitTime, boolean isFlattened) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this rename?

if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) {
return generateRandomValue(key, commitTime, isFlattened);
} else if (TRIP_SCHEMA.equals(schemaStr)) {
return generatePayloadForTripSchema(key, commitTime);
} else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) {
return generatePayloadForShortTripSchema(key, commitTime);
} else if (S3_EVENTS_SCHEMA.equals(schemaStr)) {
return generatePayloadForS3EventsSchema(key, commitTime);
}

return null;
Expand Down Expand Up @@ -276,6 +280,12 @@ public RawTripTestPayload generatePayloadForShortTripSchema(HoodieKey key, Strin
return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), SHORT_TRIP_SCHEMA);
}

public S3EventTestPayload generatePayloadForS3EventsSchema(HoodieKey key, String commitTime) throws IOException {
// S3 filters by file format in default mode.
GenericRecord rec = generateRecordForS3EventSchema(key.getRecordKey(), "file-obj-key-" + commitTime + ".parquet", 1024L);
return new S3EventTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), S3_EVENTS_SCHEMA);
}

/**
* Generates a new avro record of the above schema format for a delete.
*/
Expand Down Expand Up @@ -380,6 +390,11 @@ public GenericRecord generateRecordForShortTripSchema(String rowKey, String ride
return rec;
}

public GenericRecord generateRecordForS3EventSchema(String rowKey, String objKey, Long objSize) {
GenericRecord objRecord = S3EventsSchemaUtils.generateObjInfoRecord(objKey, objSize);
return S3EventsSchemaUtils.generateS3EventRecord(rowKey, objRecord);
}

public static void createCommitFile(String basePath, String instantTime, Configuration configuration) {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
createCommitFile(basePath, instantTime, configuration, commitMetadata);
Expand Down Expand Up @@ -481,6 +496,14 @@ public List<HoodieRecord> generateInserts(String instantTime, Integer n) {
return generateInserts(instantTime, n, false);
}

/**
* Generates new inserts with given schema, uniformly across the partition paths above.
* It also updates the list of existing keys.
*/
public List<HoodieRecord> generateInsertsWithSchema(String instantTime, Integer n, String schemaStr) {
return generateInsertsWithSchema(instantTime, n, schemaStr, false);
}

/**
* Generates new inserts, uniformly across the partition paths above.
* It also updates the list of existing keys.
Expand All @@ -494,6 +517,21 @@ public List<HoodieRecord> generateInserts(String instantTime, Integer n, boolean
return generateInsertsStream(instantTime, n, isFlattened, TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
}

/**
* Generates new inserts, uniformly across the partition paths above.
* It also updates the list of existing keys.
*
* @param instantTime Commit time to use.
* @param n Number of records.
* @param schemaStr Schema String to generate data for.
* @param isFlattened whether the schema of the generated record is flattened
* @return List of {@link HoodieRecord}s
*/
public List<HoodieRecord> generateInsertsWithSchema(String instantTime, Integer n, String schemaStr,
boolean isFlattened) {
return generateInsertsStream(instantTime, n, isFlattened, schemaStr).collect(Collectors.toList());
}

/**
* Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,61 +23,34 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;

/**
* Example row change event based on some example data used by testcases. The data avro schema is
* src/test/resources/schema1.
*/
public class RawTripTestPayload implements HoodieRecordPayload<RawTripTestPayload> {

private static final transient ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private String partitionPath;
private String rowKey;
private byte[] jsonDataCompressed;
private int dataSize;
private boolean isDeleted;
private Comparable orderingVal;
public class RawTripTestPayload extends GenericTestPayload implements HoodieRecordPayload<RawTripTestPayload> {

public RawTripTestPayload(Option<String> jsonData, String rowKey, String partitionPath, String schemaStr,
Boolean isDeleted, Comparable orderingVal) throws IOException {
if (jsonData.isPresent()) {
this.jsonDataCompressed = compressData(jsonData.get());
this.dataSize = jsonData.get().length();
}
this.rowKey = rowKey;
this.partitionPath = partitionPath;
this.isDeleted = isDeleted;
this.orderingVal = orderingVal;
super(jsonData, rowKey, partitionPath, schemaStr, isDeleted, orderingVal);
}

public RawTripTestPayload(String jsonData, String rowKey, String partitionPath, String schemaStr) throws IOException {
this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false, 0L);
}

public RawTripTestPayload(String jsonData) throws IOException {
this.jsonDataCompressed = compressData(jsonData);
this.dataSize = jsonData.length();
Map<String, Object> jsonRecordMap = OBJECT_MAPPER.readValue(jsonData, Map.class);
this.rowKey = jsonRecordMap.get("_row_key").toString();
this.partitionPath = jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/");
this.isDeleted = false;
super(jsonData);
}

/**
Expand Down Expand Up @@ -119,11 +92,6 @@ public static List<String> deleteRecordsToStrings(List<HoodieKey> records) {
.collect(Collectors.toList());
}

public String getPartitionPath() {
return partitionPath;
}

@Override
public RawTripTestPayload preCombine(RawTripTestPayload oldValue) {
if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
// pick the payload with greatest ordering value
Expand All @@ -148,11 +116,6 @@ public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
}
}

public IndexedRecord getRecordToInsert(Schema schema) throws IOException {
MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
return jsonConverter.convert(getJsonData(), schema);
}

@Override
public Option<Map<String, String>> getMetadata() {
// Let's assume we want to count the number of input row change events
Expand All @@ -162,38 +125,11 @@ public Option<Map<String, String>> getMetadata() {
return Option.of(metadataMap);
}

public String getRowKey() {
return rowKey;
}

public String getJsonData() throws IOException {
return unCompressData(jsonDataCompressed);
}

private byte[] compressData(String jsonData) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DeflaterOutputStream dos = new DeflaterOutputStream(baos, new Deflater(Deflater.BEST_COMPRESSION), true);
try {
dos.write(jsonData.getBytes());
} finally {
dos.flush();
dos.close();
}
return baos.toByteArray();
}

private String unCompressData(byte[] data) throws IOException {
try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data))) {
return FileIOUtils.readAsUTFString(iis, dataSize);
}
}

public RawTripTestPayload clone() {
try {
return new RawTripTestPayload(unCompressData(jsonDataCompressed), rowKey, partitionPath, null);
} catch (IOException e) {
return null;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package org.apache.hudi.common.testutils;

import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;
import java.util.Map;

/**
* Test payload for S3 event here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html).
*/
public class S3EventTestPayload extends GenericTestPayload implements HoodieRecordPayload<S3EventTestPayload> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest just test with DefaultHoodieRecordPayload with a specific S3 event schema, instead of creating a new test payload, as we want to test as close as the real scenario. Besides, we don't couple payload with schema, as payload is just responsible for how to merge

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a lot of existing misused with the RawTripTestPayload see https://issues.apache.org/jira/browse/HUDI-6164

so you may want to decouple the improvement changes from payload changes.

public S3EventTestPayload(Option<String> jsonData, String rowKey, String partitionPath, String schemaStr,
Boolean isDeleted, Comparable orderingVal) throws IOException {
super(jsonData, rowKey, partitionPath, schemaStr, isDeleted, orderingVal);
}

public S3EventTestPayload(String jsonData, String rowKey, String partitionPath, String schemaStr) throws IOException {
this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false, 0L);
}

public S3EventTestPayload(String jsonData) throws IOException {
super(jsonData);
}

public S3EventTestPayload preCombine(S3EventTestPayload oldValue) {
throw new UnsupportedOperationException("preCombine not implemented for S3EventTestPayload");
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord oldRec, Schema schema) throws IOException {
throw new UnsupportedOperationException("combineAndGetUpdateValue not implemented for S3EventTestPayload");
}

@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws IOException {
if (isDeleted) {
return Option.empty();
} else {
MercifulJsonConverter jsonConverter = new MercifulJsonConverter();
return Option.of(jsonConverter.convert(getJsonData(), schema));
}
}

@Override
public Option<Map<String, String>> getMetadata() {
return Option.empty();
}
}
Loading