diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 2f226b2d46098..ffa503014b2f5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -595,7 +595,6 @@ public static Schema getNestedFieldSchemaFromRecord(GenericRecord record, String throw new HoodieException("Failed to get schema. Not a valid field name: " + fieldName); } - /** * Get schema for the given field and write schema. Field can be nested, denoted by dot notation. e.g: a.b.c * Use this method when record is not available. Otherwise, prefer to use {@link #getNestedFieldSchemaFromRecord(GenericRecord, String)} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/GenericTestPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/GenericTestPayload.java new file mode 100644 index 0000000000000..25ddb11f2fb07 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/GenericTestPayload.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.testutils; + +import org.apache.hudi.avro.MercifulJsonConverter; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.Option; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.InflaterInputStream; + +/** + * Generic class for specific payload implementations to inherit from. + */ +public abstract class GenericTestPayload { + + protected static final transient ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + protected String partitionPath; + protected String rowKey; + protected byte[] jsonDataCompressed; + protected int dataSize; + protected boolean isDeleted; + protected Comparable orderingVal; + + public GenericTestPayload(Option jsonData, String rowKey, String partitionPath, String schemaStr, + Boolean isDeleted, Comparable orderingVal) throws IOException { + if (jsonData.isPresent()) { + this.jsonDataCompressed = compressData(jsonData.get()); + this.dataSize = jsonData.get().length(); + } + this.rowKey = rowKey; + this.partitionPath = partitionPath; + this.isDeleted = isDeleted; + this.orderingVal = orderingVal; + } + + public GenericTestPayload(String jsonData, String rowKey, String partitionPath, String schemaStr) throws IOException { + this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false, 0L); + } + + public GenericTestPayload(String jsonData) throws IOException { + this.jsonDataCompressed = compressData(jsonData); + this.dataSize = jsonData.length(); + Map jsonRecordMap = OBJECT_MAPPER.readValue(jsonData, Map.class); + this.rowKey = jsonRecordMap.get("_row_key").toString(); + this.partitionPath = jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/"); + this.isDeleted = false; + } + + public String getPartitionPath() { + return partitionPath; + } + + public IndexedRecord getRecordToInsert(Schema schema) throws IOException { + MercifulJsonConverter jsonConverter = new MercifulJsonConverter(); + return jsonConverter.convert(getJsonData(), schema); + } + + public String getRowKey() { + return rowKey; + } + + public String getJsonData() throws IOException { + return unCompressData(jsonDataCompressed); + } + + private byte[] compressData(String jsonData) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DeflaterOutputStream dos = new DeflaterOutputStream(baos, new Deflater(Deflater.BEST_COMPRESSION), true); + try { + dos.write(jsonData.getBytes()); + } finally { + dos.flush(); + dos.close(); + } + return baos.toByteArray(); + } + + private String unCompressData(byte[] data) throws IOException { + try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data))) { + return FileIOUtils.readAsUTFString(iis, dataSize); + } + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 8614060126dfa..f225ece8a50d8 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -133,6 +134,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { + "{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"}," + "{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}"; + public static final String S3_EVENTS_SCHEMA = S3EventsSchemaUtils.generateSchemaString(); public static final String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); public static final String TRIP_HIVE_COLUMN_TYPES = "bigint,string,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6)," + "map,struct,array>,boolean"; @@ -216,13 +218,15 @@ public int getEstimatedFileSizeInBytes(int numOfRecords) { return numOfRecords * BYTES_PER_RECORD + BLOOM_FILTER_BYTES; } - public RawTripTestPayload generateRandomValueAsPerSchema(String schemaStr, HoodieKey key, String commitTime, boolean isFlattened) throws IOException { + public HoodieRecordPayload generateRandomValueAsPerSchema(String schemaStr, HoodieKey key, String commitTime, boolean isFlattened) throws IOException { if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) { return generateRandomValue(key, commitTime, isFlattened); } else if (TRIP_SCHEMA.equals(schemaStr)) { return generatePayloadForTripSchema(key, commitTime); } else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) { return generatePayloadForShortTripSchema(key, commitTime); + } else if (S3_EVENTS_SCHEMA.equals(schemaStr)) { + return generatePayloadForS3EventsSchema(key, commitTime); } return null; @@ -276,6 +280,12 @@ public RawTripTestPayload generatePayloadForShortTripSchema(HoodieKey key, Strin return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), SHORT_TRIP_SCHEMA); } + public S3EventTestPayload generatePayloadForS3EventsSchema(HoodieKey key, String commitTime) throws IOException { + // S3 filters by file format in default mode. + GenericRecord rec = generateRecordForS3EventSchema(key.getRecordKey(), "file-obj-key-" + commitTime + ".parquet", 1024L); + return new S3EventTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), S3_EVENTS_SCHEMA); + } + /** * Generates a new avro record of the above schema format for a delete. */ @@ -380,6 +390,11 @@ public GenericRecord generateRecordForShortTripSchema(String rowKey, String ride return rec; } + public GenericRecord generateRecordForS3EventSchema(String rowKey, String objKey, Long objSize) { + GenericRecord objRecord = S3EventsSchemaUtils.generateObjInfoRecord(objKey, objSize); + return S3EventsSchemaUtils.generateS3EventRecord(rowKey, objRecord); + } + public static void createCommitFile(String basePath, String instantTime, Configuration configuration) { HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); createCommitFile(basePath, instantTime, configuration, commitMetadata); @@ -481,6 +496,14 @@ public List generateInserts(String instantTime, Integer n) { return generateInserts(instantTime, n, false); } + /** + * Generates new inserts with given schema, uniformly across the partition paths above. + * It also updates the list of existing keys. + */ + public List generateInsertsWithSchema(String instantTime, Integer n, String schemaStr) { + return generateInsertsWithSchema(instantTime, n, schemaStr, false); + } + /** * Generates new inserts, uniformly across the partition paths above. * It also updates the list of existing keys. @@ -494,6 +517,21 @@ public List generateInserts(String instantTime, Integer n, boolean return generateInsertsStream(instantTime, n, isFlattened, TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList()); } + /** + * Generates new inserts, uniformly across the partition paths above. + * It also updates the list of existing keys. + * + * @param instantTime Commit time to use. + * @param n Number of records. + * @param schemaStr Schema String to generate data for. + * @param isFlattened whether the schema of the generated record is flattened + * @return List of {@link HoodieRecord}s + */ + public List generateInsertsWithSchema(String instantTime, Integer n, String schemaStr, + boolean isFlattened) { + return generateInsertsStream(instantTime, n, isFlattened, schemaStr).collect(Collectors.toList()); + } + /** * Generates new inserts, uniformly across the partition paths above. It also updates the list of existing keys. */ diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java index cd437db8b3aa9..0626b30f79e88 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/RawTripTestPayload.java @@ -23,48 +23,26 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import java.util.zip.Deflater; -import java.util.zip.DeflaterOutputStream; -import java.util.zip.InflaterInputStream; /** * Example row change event based on some example data used by testcases. The data avro schema is * src/test/resources/schema1. */ -public class RawTripTestPayload implements HoodieRecordPayload { - - private static final transient ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private String partitionPath; - private String rowKey; - private byte[] jsonDataCompressed; - private int dataSize; - private boolean isDeleted; - private Comparable orderingVal; +public class RawTripTestPayload extends GenericTestPayload implements HoodieRecordPayload { public RawTripTestPayload(Option jsonData, String rowKey, String partitionPath, String schemaStr, Boolean isDeleted, Comparable orderingVal) throws IOException { - if (jsonData.isPresent()) { - this.jsonDataCompressed = compressData(jsonData.get()); - this.dataSize = jsonData.get().length(); - } - this.rowKey = rowKey; - this.partitionPath = partitionPath; - this.isDeleted = isDeleted; - this.orderingVal = orderingVal; + super(jsonData, rowKey, partitionPath, schemaStr, isDeleted, orderingVal); } public RawTripTestPayload(String jsonData, String rowKey, String partitionPath, String schemaStr) throws IOException { @@ -72,12 +50,7 @@ public RawTripTestPayload(String jsonData, String rowKey, String partitionPath, } public RawTripTestPayload(String jsonData) throws IOException { - this.jsonDataCompressed = compressData(jsonData); - this.dataSize = jsonData.length(); - Map jsonRecordMap = OBJECT_MAPPER.readValue(jsonData, Map.class); - this.rowKey = jsonRecordMap.get("_row_key").toString(); - this.partitionPath = jsonRecordMap.get("time").toString().split("T")[0].replace("-", "/"); - this.isDeleted = false; + super(jsonData); } /** @@ -119,11 +92,6 @@ public static List deleteRecordsToStrings(List records) { .collect(Collectors.toList()); } - public String getPartitionPath() { - return partitionPath; - } - - @Override public RawTripTestPayload preCombine(RawTripTestPayload oldValue) { if (oldValue.orderingVal.compareTo(orderingVal) > 0) { // pick the payload with greatest ordering value @@ -148,11 +116,6 @@ public Option getInsertValue(Schema schema) throws IOException { } } - public IndexedRecord getRecordToInsert(Schema schema) throws IOException { - MercifulJsonConverter jsonConverter = new MercifulJsonConverter(); - return jsonConverter.convert(getJsonData(), schema); - } - @Override public Option> getMetadata() { // Let's assume we want to count the number of input row change events @@ -162,32 +125,6 @@ public Option> getMetadata() { return Option.of(metadataMap); } - public String getRowKey() { - return rowKey; - } - - public String getJsonData() throws IOException { - return unCompressData(jsonDataCompressed); - } - - private byte[] compressData(String jsonData) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DeflaterOutputStream dos = new DeflaterOutputStream(baos, new Deflater(Deflater.BEST_COMPRESSION), true); - try { - dos.write(jsonData.getBytes()); - } finally { - dos.flush(); - dos.close(); - } - return baos.toByteArray(); - } - - private String unCompressData(byte[] data) throws IOException { - try (InflaterInputStream iis = new InflaterInputStream(new ByteArrayInputStream(data))) { - return FileIOUtils.readAsUTFString(iis, dataSize); - } - } - public RawTripTestPayload clone() { try { return new RawTripTestPayload(unCompressData(jsonDataCompressed), rowKey, partitionPath, null); @@ -195,5 +132,4 @@ public RawTripTestPayload clone() { return null; } } - } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventTestPayload.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventTestPayload.java new file mode 100644 index 0000000000000..6f3f960e1c4e2 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventTestPayload.java @@ -0,0 +1,53 @@ +package org.apache.hudi.common.testutils; + +import org.apache.hudi.avro.MercifulJsonConverter; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; + +import java.io.IOException; +import java.util.Map; + +/** + * Test payload for S3 event here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html). + */ +public class S3EventTestPayload extends GenericTestPayload implements HoodieRecordPayload { + public S3EventTestPayload(Option jsonData, String rowKey, String partitionPath, String schemaStr, + Boolean isDeleted, Comparable orderingVal) throws IOException { + super(jsonData, rowKey, partitionPath, schemaStr, isDeleted, orderingVal); + } + + public S3EventTestPayload(String jsonData, String rowKey, String partitionPath, String schemaStr) throws IOException { + this(Option.of(jsonData), rowKey, partitionPath, schemaStr, false, 0L); + } + + public S3EventTestPayload(String jsonData) throws IOException { + super(jsonData); + } + + public S3EventTestPayload preCombine(S3EventTestPayload oldValue) { + throw new UnsupportedOperationException("preCombine not implemented for S3EventTestPayload"); + } + + @Override + public Option combineAndGetUpdateValue(IndexedRecord oldRec, Schema schema) throws IOException { + throw new UnsupportedOperationException("combineAndGetUpdateValue not implemented for S3EventTestPayload"); + } + + @Override + public Option getInsertValue(Schema schema) throws IOException { + if (isDeleted) { + return Option.empty(); + } else { + MercifulJsonConverter jsonConverter = new MercifulJsonConverter(); + return Option.of(jsonConverter.convert(getJsonData(), schema)); + } + } + + @Override + public Option> getMetadata() { + return Option.empty(); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventsSchemaUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventsSchemaUtils.java new file mode 100644 index 0000000000000..65f95806b820f --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/S3EventsSchemaUtils.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.testutils; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; + +// Utility for the schema of S3 events listed here (https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html) +public class S3EventsSchemaUtils { + public static final String DEFAULT_STRING_VALUE = "default_string"; + public static final GenericRecord DEFAULT_S3_BUCKET_RECORD; + static { + GenericRecord rec = new GenericData.Record(generateBucketInfoSchema()); + rec.put("name", "default_s3_bucket"); + DEFAULT_S3_BUCKET_RECORD = rec; + } + + public static String generateSchemaString() { + return generateS3EventSchema().toString(); + } + + public static Schema generateObjInfoSchema() { + Schema objInfo = SchemaBuilder.record("objInfo") + .fields() + .requiredString("key") + .requiredLong("size") + .endRecord(); + return objInfo; + } + + public static Schema generateBucketInfoSchema() { + Schema bucketInfo = SchemaBuilder.record("bucketInfo") + .fields() + .requiredString("name") + .endRecord(); + return bucketInfo; + } + + public static GenericRecord generateObjInfoRecord(String key, Long size) { + GenericRecord rec = new GenericData.Record(generateObjInfoSchema()); + rec.put("key", key); + rec.put("size", size); + return rec; + } + + public static Schema generateS3MetadataSchema() { + Schema s3Metadata = SchemaBuilder.record("s3Metadata") + .fields() + .requiredString("configurationId") + .name("object") + .type(generateObjInfoSchema()) + .noDefault() + .name("bucket") + .type(generateBucketInfoSchema()) + .noDefault() + .endRecord(); + return s3Metadata; + } + + public static GenericRecord generateS3MetadataRecord(GenericRecord objRecord) { + GenericRecord rec = new GenericData.Record(generateS3MetadataSchema()); + rec.put("configurationId", DEFAULT_STRING_VALUE); + rec.put("object", objRecord); + rec.put("bucket", DEFAULT_S3_BUCKET_RECORD); + return rec; + } + + public static Schema generateS3EventSchema() { + Schema s3Event = SchemaBuilder.record("s3Event") + .fields() + .requiredString("eventSource") + .requiredString("eventName") + .requiredString("_row_key") + .name("s3") + .type(generateS3MetadataSchema()) + .noDefault() + .endRecord(); + return s3Event; + } + + public static GenericRecord generateS3EventRecord(String rowKey, GenericRecord objRecord) { + GenericRecord rec = new GenericData.Record(generateS3EventSchema()); + rec.put("_row_key", rowKey); + rec.put("eventSource", DEFAULT_STRING_VALUE); + rec.put("eventName", DEFAULT_STRING_VALUE); + rec.put("s3", generateS3MetadataRecord(objRecord)); + return rec; + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 7b8232f6194a3..da104af659f3d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -28,7 +28,6 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; @@ -63,6 +62,7 @@ import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT; import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT; + /** * This source will use the S3 events meta information from hoodie table generate by {@link S3EventsSource}. */ @@ -115,8 +115,7 @@ private DataFrameReader getDataFrameReader(String fileFormat) { return dataFrameReader; } - @Override - public Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { + public Pair>, String> fetchMetadata(Option lastCkptStr, long sourceLimit) { DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HOODIE_SRC_BASE_PATH)); String srcPath = props.getString(HOODIE_SRC_BASE_PATH); int numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, DEFAULT_NUM_INSTANTS_PER_FETCH); @@ -127,7 +126,6 @@ public Pair>, String> fetchNextBatch(Option lastCkpt if (readLatestOnMissingCkpt) { missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST; } - String fileFormat = props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT); // Use begin Instant if set and non-empty Option beginInstant = @@ -141,7 +139,7 @@ public Pair>, String> fetchNextBatch(Option lastCkpt if (queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue())) { LOG.warn("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getValue().getKey()); - return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getValue().getKey()); + return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight()); } Dataset source = null; @@ -161,11 +159,18 @@ public Pair>, String> fetchNextBatch(Option lastCkpt .filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, queryTypeAndInstantEndpts.getRight().getRight())); } + return Pair.of(Option.of(source), queryTypeAndInstantEndpts.getRight().getRight()); + } - if (source.isEmpty()) { - return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight()); + @Override + public Pair>, String> fetchNextBatch(Option lastCkptStr, long sourceLimit) { + Pair>, String> sourceMetadata = fetchMetadata(lastCkptStr, sourceLimit); + if (!sourceMetadata.getKey().isPresent()) { + return Pair.of(Option.empty(), sourceMetadata.getRight()); } + Dataset source = sourceMetadata.getKey().get(); + String fileFormat = props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT); String filter = "s3.object.size > 0"; if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX, null))) { filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'"; @@ -189,33 +194,36 @@ public Pair>, String> fetchNextBatch(Option lastCkpt .filter(filter) .select("s3.bucket.name", "s3.object.key") .distinct() - .mapPartitions((MapPartitionsFunction) fileListIterator -> { + .rdd() + // JavaRDD simplifies coding with collect and suitable mapPartitions signature. check if this can be avoided. + .toJavaRDD() + .mapPartitions(fileListIterator -> { List cloudFilesPerPartition = new ArrayList<>(); - final Configuration configuration = serializableConfiguration.newCopy(); fileListIterator.forEachRemaining(row -> { + // TODO: configuration is updated in the getFs call. check if new copy is needed w.r.t to getFs. + final Configuration configuration = serializableConfiguration.newCopy(); String bucket = row.getString(0); String filePath = s3Prefix + bucket + "/" + row.getString(1); - String decodeUrl = null; try { - decodeUrl = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name()); + String decodeUrl = URLDecoder.decode(filePath, StandardCharsets.UTF_8.name()); if (checkExists) { FileSystem fs = FSUtils.getFs(s3Prefix + bucket, configuration); - if (fs.exists(new Path(decodeUrl))) { - cloudFilesPerPartition.add(decodeUrl); + try { + if (fs.exists(new Path(decodeUrl))) { + cloudFilesPerPartition.add(decodeUrl); + } + } catch (IOException e) { + LOG.error(String.format("Error while checking path exists for %s ", decodeUrl), e); } } else { cloudFilesPerPartition.add(decodeUrl); } - } catch (IOException e) { - LOG.error(String.format("Error while checking path exists for %s ", decodeUrl), e); - throw new HoodieIOException(String.format("Error while checking path exists for %s ", decodeUrl), e); - } catch (Throwable e) { - LOG.warn("Failed to add cloud file ", e); - throw new HoodieException("Failed to add cloud file", e); + } catch (Exception exception) { + LOG.warn("Failed to add cloud file ", exception); } }); return cloudFilesPerPartition.iterator(); - }, Encoders.STRING()).collectAsList(); + }).collect(); Option> dataset = Option.empty(); if (!cloudFiles.isEmpty()) { @@ -224,6 +232,6 @@ public Pair>, String> fetchNextBatch(Option lastCkpt } LOG.debug("Extracted distinct files " + cloudFiles.size() + " and some samples " + cloudFiles.stream().limit(10).collect(Collectors.toList())); - return Pair.of(dataset, queryTypeAndInstantEndpts.getRight().getRight()); + return Pair.of(dataset, sourceMetadata.getRight()); } -} +} \ No newline at end of file diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java new file mode 100644 index 0000000000000..836a19490dc70 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import org.apache.avro.Schema; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.S3EventsSchemaUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; +import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.S3_EVENTS_SCHEMA; +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class TestS3EventsHoodieIncrSource extends SparkClientFunctionalTestHarness { + private HoodieTestDataGenerator dataGen; + private HoodieTableMetaClient metaClient; + + @BeforeEach + public void setUp() throws IOException { + dataGen = new HoodieTestDataGenerator(); + metaClient = getHoodieMetaClient(hadoopConf(), basePath()); + } + + @Test + public void testHoodieIncrSource() throws IOException { + HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient) + .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) + .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .build(); + + SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig); + Pair> inserts = writeRecords(writeClient, null, "100"); + Pair> inserts2 = writeRecords(writeClient, null, "200"); + Pair> inserts3 = writeRecords(writeClient, null, "300"); + Pair> inserts4 = writeRecords(writeClient, null, "400"); + Pair> inserts5 = writeRecords(writeClient, null, "500"); + + // read everything upto latest + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 500, inserts5.getKey()); + + // even if the begin timestamp is archived (100), full table scan should kick in, but should filter for records having commit time > 100 + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("100"), 400, inserts5.getKey()); + + // even if the read upto latest is set, if begin timestamp is in active timeline, only incremental should kick in. + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("400"), 100, inserts5.getKey()); + + // read just the latest + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.empty(), 100, inserts5.getKey()); + + // ensure checkpoint does not move + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 0, inserts5.getKey()); + + Pair> inserts6 = writeRecords(writeClient, null, "600"); + + // insert new batch and ensure the checkpoint moves + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 100, inserts6.getKey()); + writeClient.close(); + } + + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option checkpointToPull, int expectedCount, String expectedCheckpoint) { + + Properties properties = new Properties(); + properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath()); + properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name()); + TypedProperties typedProperties = new TypedProperties(properties); + S3EventsHoodieIncrSource s3IncrSource = new S3EventsHoodieIncrSource(typedProperties, jsc(), spark(), new DummySchemaProvider(S3EventsSchemaUtils.generateS3EventSchema())); + + // read everything until latest + Pair>, String> batchCheckPoint = s3IncrSource.fetchMetadata(checkpointToPull, 500); + Assertions.assertNotNull(batchCheckPoint.getValue()); + if (expectedCount == 0) { + assertFalse(batchCheckPoint.getKey().isPresent()); + } else { + assertEquals(batchCheckPoint.getKey().get().count(), expectedCount); + } + Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint); + } + + private Pair> writeRecords(SparkRDDWriteClient writeClient, List insertRecords, String commit) throws IOException { + writeClient.startCommitWithTime(commit); + List records = dataGen.generateInsertsWithSchema(commit, 100, S3_EVENTS_SCHEMA); + JavaRDD result = writeClient.upsert(jsc().parallelize(records, 1), commit); + List statuses = result.collect(); + assertNoWriteErrors(statuses); + return Pair.of(commit, records); + } + + private HoodieWriteConfig.Builder getConfigBuilder(String basePath, HoodieTableMetaClient metaClient) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(S3_EVENTS_SCHEMA) + .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2) + .withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION) + .forTable(metaClient.getTableConfig().getTableName()); + } + + private static class DummySchemaProvider extends SchemaProvider { + + private final Schema schema; + + public DummySchemaProvider(Schema schema) { + super(new TypedProperties()); + this.schema = schema; + } + + @Override + public Schema getSourceSchema() { + return schema; + } + } +}