diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index 326bdaca0b9a8..600c7d24ea848 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -39,6 +40,7 @@ import org.apache.hudi.avro.model.HoodieSavepointMetadata; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.HoodieRollbackStat; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieDataFile; import org.apache.hudi.common.model.HoodieKey; @@ -94,6 +96,8 @@ public class HoodieWriteClient extends AbstractHoodieClient { private static Logger logger = LogManager.getLogger(HoodieWriteClient.class); + private static final String UPDATE_STR = "update"; + private static final String LOOKUP_STR = "lookup"; private final boolean rollbackInFlight; private final transient HoodieMetrics metrics; private final transient HoodieIndex index; @@ -103,18 +107,14 @@ public class HoodieWriteClient extends AbstractHo private transient Timer.Context indexTimer = null; /** - * @param jsc - * @param clientConfig - * @throws Exception + * */ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) throws Exception { this(jsc, clientConfig, false); } /** - * @param jsc - * @param clientConfig - * @param rollbackInFlight + * */ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackInFlight) { this(jsc, clientConfig, rollbackInFlight, HoodieIndex.createIndex(clientConfig, jsc)); @@ -150,7 +150,7 @@ public JavaRDD> filterExists(JavaRDD> hoodieReco HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); indexTimer = metrics.getIndexCtx(); JavaRDD> recordsWithLocation = index.tagLocation(hoodieRecords, jsc, table); - metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); + metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); indexTimer = null; return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown()); } @@ -159,7 +159,7 @@ public JavaRDD> filterExists(JavaRDD> hoodieReco * Upserts a bunch of new records into the Hoodie table, at the supplied commitTime */ public JavaRDD upsert(JavaRDD> records, final String commitTime) { - HoodieTable table = getTableAndInitCtx(records); + HoodieTable table = getTableAndInitCtx(OperationType.UPSERT); try { // De-dupe/merge if needed JavaRDD> dedupedRecords = @@ -168,7 +168,7 @@ public JavaRDD upsert(JavaRDD> records, final Strin indexTimer = metrics.getIndexCtx(); // perform index loop up to get existing location of records JavaRDD> taggedRecords = index.tagLocation(dedupedRecords, jsc, table); - metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); + metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); indexTimer = null; return upsertRecordsInternal(taggedRecords, commitTime, table, true); } catch (Throwable e) { @@ -189,7 +189,7 @@ public JavaRDD upsert(JavaRDD> records, final Strin * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD upsertPreppedRecords(JavaRDD> preppedRecords, final String commitTime) { - HoodieTable table = getTableAndInitCtx(preppedRecords); + HoodieTable table = getTableAndInitCtx(OperationType.UPSERT_PREPPED); try { return upsertRecordsInternal(preppedRecords, commitTime, table, true); } catch (Throwable e) { @@ -211,7 +211,7 @@ public JavaRDD upsertPreppedRecords(JavaRDD> preppe * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD insert(JavaRDD> records, final String commitTime) { - HoodieTable table = getTableAndInitCtx(records); + HoodieTable table = getTableAndInitCtx(OperationType.INSERT); try { // De-dupe/merge if needed JavaRDD> dedupedRecords = @@ -238,7 +238,7 @@ public JavaRDD insert(JavaRDD> records, final Strin * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts */ public JavaRDD insertPreppedRecords(JavaRDD> preppedRecords, final String commitTime) { - HoodieTable table = getTableAndInitCtx(preppedRecords); + HoodieTable table = getTableAndInitCtx(OperationType.INSERT_PREPPED); try { return upsertRecordsInternal(preppedRecords, commitTime, table, false); } catch (Throwable e) { @@ -281,7 +281,7 @@ public JavaRDD bulkInsert(JavaRDD> records, final S */ public JavaRDD bulkInsert(JavaRDD> records, final String commitTime, Option bulkInsertPartitioner) { - HoodieTable table = getTableAndInitCtx(records); + HoodieTable table = getTableAndInitCtx(OperationType.BULK_INSERT); try { // De-dupe/merge if needed JavaRDD> dedupedRecords = @@ -314,7 +314,7 @@ public JavaRDD bulkInsert(JavaRDD> records, final S */ public JavaRDD bulkInsertPreppedRecords(JavaRDD> preppedRecords, final String commitTime, Option bulkInsertPartitioner) { - HoodieTable table = getTableAndInitCtx(preppedRecords); + HoodieTable table = getTableAndInitCtx(OperationType.BULK_INSERT_PREPPED); try { return bulkInsertInternal(preppedRecords, commitTime, table, bulkInsertPartitioner); } catch (Throwable e) { @@ -325,6 +325,46 @@ public JavaRDD bulkInsertPreppedRecords(JavaRDD> pr } } + /** + * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied commitTime {@link HoodieKey}s will be + * deduped and non existant keys will be removed before deleting. + * + * @param keys {@link List} of {@link HoodieKey}s to be deleted + * @param commitTime Commit time handle + * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts + */ + public JavaRDD delete(JavaRDD keys, final String commitTime) { + HoodieTable table = getTableAndInitCtx(OperationType.DELETE); + try { + // De-dupe/merge if needed + JavaRDD dedupedKeys = + config.shouldCombineBeforeDelete() ? deduplicateKeys(keys, config.getDeleteShuffleParallelism()) : keys; + + JavaRDD> dedupedRecords = + dedupedKeys.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); + indexTimer = metrics.getIndexCtx(); + // perform index loop up to get existing location of records + JavaRDD> taggedRecords = index.tagLocation(dedupedRecords, jsc, table); + // filter out non existant keys/records + JavaRDD> taggedValidRecords = taggedRecords.filter(record -> record.isCurrentLocationKnown()); + if (!taggedValidRecords.isEmpty()) { + metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); + indexTimer = null; + return upsertRecordsInternal(taggedValidRecords, commitTime, table, true); + } else { + // if entire set of keys are non existent + JavaRDD writeStatusRDD = jsc.parallelize(Collections.EMPTY_LIST, 1); + commitOnAutoCommit(commitTime, writeStatusRDD, table.getMetaClient().getCommitActionType()); + return writeStatusRDD; + } + } catch (Throwable e) { + if (e instanceof HoodieUpsertException) { + throw (HoodieUpsertException) e; + } + throw new HoodieUpsertException("Failed to delete for commit time " + commitTime, e); + } + } + private JavaRDD bulkInsertInternal(JavaRDD> dedupedRecords, String commitTime, HoodieTable table, Option bulkInsertPartitioner) { final JavaRDD> repartitionedRecords; @@ -366,10 +406,7 @@ private void commitOnAutoCommit(String commitTime, JavaRDD resultRD private JavaRDD> combineOnCondition(boolean condition, JavaRDD> records, int parallelism) { - if (condition) { - return deduplicateRecords(records, parallelism); - } - return records; + return condition ? deduplicateRecords(records, parallelism) : records; } /** @@ -451,7 +488,7 @@ private JavaRDD updateIndexAndCommitIfNeeded(JavaRDD w indexTimer = metrics.getIndexCtx(); // Update the index back JavaRDD statuses = index.updateLocation(writeStatusRDD, jsc, table); - metrics.updateIndexMetrics("update", metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); + metrics.updateIndexMetrics(UPDATE_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); indexTimer = null; // Trigger the insert and collect statuses commitOnAutoCommit(commitTime, statuses, table.getMetaClient().getCommitActionType()); @@ -501,6 +538,7 @@ private boolean commit(String commitTime, JavaRDD writeStatuses, if (extraMetadata.isPresent()) { extraMetadata.get().forEach(metadata::addMetadata); } + metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema()); try { activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, commitTime), @@ -929,8 +967,6 @@ public void close() { * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be * cleaned) - * - * @throws HoodieIOException */ public void clean() throws HoodieIOException { cleanClient.clean(); @@ -942,7 +978,6 @@ public void clean() throws HoodieIOException { * cleaned) * * @param startCleanTime Cleaner Instant Timestamp - * @return * @throws HoodieIOException in case of any IOException */ protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException { @@ -1088,6 +1123,20 @@ JavaRDD> deduplicateRecords(JavaRDD> records, in }, parallelism).map(Tuple2::_2); } + /** + * Deduplicate Hoodie records, using the given deduplication funciton. + */ + JavaRDD deduplicateKeys(JavaRDD keys, int parallelism) { + boolean isIndexingGlobal = index.isGlobal(); + if (isIndexingGlobal) { + return keys.keyBy(HoodieKey::getRecordKey) + .reduceByKey((key1, key2) -> key1) + .values(); + } else { + return keys.distinct(); + } + } + /** * Cleanup all inflight commits */ @@ -1101,9 +1150,13 @@ private void rollbackInflightCommits() { } } - private HoodieTable getTableAndInitCtx(JavaRDD> records) { + private HoodieTable getTableAndInitCtx(OperationType operationType) { + HoodieTableMetaClient metaClient = createMetaClient(true); + if (operationType == OperationType.DELETE) { + setWriteSchemaFromLastInstant(metaClient); + } // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) { writeContext = metrics.getCommitCtx(); } else { @@ -1112,6 +1165,30 @@ private HoodieTable getTableAndInitCtx(JavaRDD> records) { return table; } + /** + * Sets write schema from last instant since deletes may not have schema set in the config. + */ + private void setWriteSchemaFromLastInstant(HoodieTableMetaClient metaClient) { + try { + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + Option lastInstant = + activeTimeline.filterCompletedInstants().filter(s -> s.getAction().equals(metaClient.getCommitActionType())) + .lastInstant(); + if (lastInstant.isPresent()) { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class); + if (commitMetadata.getExtraMetadata().containsKey(HoodieCommitMetadata.SCHEMA_KEY)) { + config.setSchema(commitMetadata.getExtraMetadata().get(HoodieCommitMetadata.SCHEMA_KEY)); + } else { + throw new HoodieIOException("Latest commit does not have any schema in commit metadata"); + } + } else { + throw new HoodieIOException("Deletes issued without any prior commits"); + } + } catch (IOException e) { + throw new HoodieIOException("IOException thrown while reading last commit metadata", e); + } + } /** * Compaction specific private methods */ @@ -1323,4 +1400,16 @@ private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetada } } + /** + * Refers to different operation types + */ + enum OperationType { + INSERT, + INSERT_PREPPED, + UPSERT, + UPSERT_PREPPED, + DELETE, + BULK_INSERT, + BULK_INSERT_PREPPED + } } diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 913baa195cf6f..a391bca2a05b4 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -51,6 +51,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism"; private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"; private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"; + private static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism"; private static final String DEFAULT_ROLLBACK_PARALLELISM = "100"; private static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism"; private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes"; @@ -59,6 +60,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String DEFAULT_COMBINE_BEFORE_INSERT = "false"; private static final String COMBINE_BEFORE_UPSERT_PROP = "hoodie.combine.before.upsert"; private static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true"; + private static final String COMBINE_BEFORE_DELETE_PROP = "hoodie.combine.before.delete"; + private static final String DEFAULT_COMBINE_BEFORE_DELETE = "true"; private static final String WRITE_STATUS_STORAGE_LEVEL = "hoodie.write.status.storage.level"; private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit"; @@ -119,6 +122,10 @@ public String getSchema() { return props.getProperty(AVRO_SCHEMA); } + public void setSchema(String schemaStr) { + props.setProperty(AVRO_SCHEMA, schemaStr); + } + public String getTableName() { return props.getProperty(TABLE_NAME); } @@ -143,6 +150,10 @@ public int getUpsertShuffleParallelism() { return Integer.parseInt(props.getProperty(UPSERT_PARALLELISM)); } + public int getDeleteShuffleParallelism() { + return Integer.parseInt(props.getProperty(DELETE_PARALLELISM)); + } + public int getRollbackParallelism() { return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM)); } @@ -159,6 +170,10 @@ public boolean shouldCombineBeforeUpsert() { return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_UPSERT_PROP)); } + public boolean shouldCombineBeforeDelete() { + return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_DELETE_PROP)); + } + public StorageLevel getWriteStatusStorageLevel() { return StorageLevel.fromString(props.getProperty(WRITE_STATUS_STORAGE_LEVEL)); } @@ -666,11 +681,14 @@ public HoodieWriteConfig build() { setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM, DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, DEFAULT_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), COMBINE_BEFORE_INSERT_PROP, DEFAULT_COMBINE_BEFORE_INSERT); setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_UPSERT_PROP), COMBINE_BEFORE_UPSERT_PROP, DEFAULT_COMBINE_BEFORE_UPSERT); + setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_DELETE_PROP), COMBINE_BEFORE_DELETE_PROP, + DEFAULT_COMBINE_BEFORE_DELETE); setDefaultOnCondition(props, !props.containsKey(WRITE_STATUS_STORAGE_LEVEL), WRITE_STATUS_STORAGE_LEVEL, DEFAULT_WRITE_STATUS_STORAGE_LEVEL); setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), HOODIE_AUTO_COMMIT_PROP, diff --git a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java index 78a7510340447..68612d7725321 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java @@ -72,7 +72,7 @@ public abstract JavaPairRDD>> fetchRecord /** * Looks up the index and tags each incoming record with a location of a file that contains the row (if it is actually - * present) + * present). */ public abstract JavaRDD> tagLocation(JavaRDD> recordRDD, JavaSparkContext jsc, HoodieTable hoodieTable) throws HoodieIndexException; diff --git a/hudi-client/src/test/java/HoodieClientExample.java b/hudi-client/src/test/java/HoodieClientExample.java index a697402055ddd..9e42db1f123df 100644 --- a/hudi-client/src/test/java/HoodieClientExample.java +++ b/hudi-client/src/test/java/HoodieClientExample.java @@ -18,13 +18,16 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hudi.HoodieWriteClient; import org.apache.hudi.WriteStatus; +import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -93,6 +96,7 @@ public void run() throws Exception { .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).build()).build(); HoodieWriteClient client = new HoodieWriteClient(jsc, cfg); + List recordsSoFar = new ArrayList<>(); /** * Write 1 (only inserts) */ @@ -100,6 +104,7 @@ public void run() throws Exception { logger.info("Starting commit " + newCommitTime); List records = dataGen.generateInserts(newCommitTime, 100); + recordsSoFar.addAll(records); JavaRDD writeRecords = jsc.parallelize(records, 1); client.upsert(writeRecords, newCommitTime); @@ -108,10 +113,22 @@ public void run() throws Exception { */ newCommitTime = client.startCommit(); logger.info("Starting commit " + newCommitTime); - records.addAll(dataGen.generateUpdates(newCommitTime, 100)); + List toBeUpdated = dataGen.generateUpdates(newCommitTime, 100); + records.addAll(toBeUpdated); + recordsSoFar.addAll(toBeUpdated); writeRecords = jsc.parallelize(records, 1); client.upsert(writeRecords, newCommitTime); + /** + * Delete 1 + */ + newCommitTime = client.startCommit(); + logger.info("Starting commit " + newCommitTime); + List toBeDeleted = HoodieClientTestUtils + .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(recordsSoFar), 10); + JavaRDD deleteRecords = jsc.parallelize(toBeDeleted, 1); + client.delete(deleteRecords, newCommitTime); + /** * Schedule a compaction and also perform compaction on a MOR dataset */ diff --git a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java index 183ea351ffd6a..5200a05d856b7 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java @@ -35,6 +35,8 @@ import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTestUtils; @@ -108,13 +110,23 @@ protected HoodieWriteConfig getConfig() { return getConfigBuilder().build(); } + /** * Get Config builder with default configs set * * @return Config Builder */ HoodieWriteConfig.Builder getConfigBuilder() { - return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) + return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); + } + + /** + * Get Config builder with default configs set + * + * @return Config Builder + */ + HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) { + return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr) .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2) .withWriteStatusClass(MetadataMergeWriteStatus.class) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) @@ -214,6 +226,29 @@ private Function2, String, Integer> wrapRecordsGenFunctionFor }; } + /** + * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys + * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is + * guaranteed by key-generation function itself. + * + * @param writeConfig Hoodie Write Config + * @param keyGenFunction Keys Generation function + * @return Wrapped function + */ + private Function2, String, Integer> wrapDeleteKeysGenFunctionForPreppedCalls( + final HoodieWriteConfig writeConfig, final Function2, String, Integer> keyGenFunction) { + return (commit, numRecords) -> { + final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc); + List records = keyGenFunction.apply(commit, numRecords); + final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc); + JavaRDD recordsToDelete = jsc.parallelize(records, 1) + .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload())); + JavaRDD taggedRecords = index.tagLocation(recordsToDelete, jsc, table); + return taggedRecords.map(record -> record.getKey()).collect(); + }; + } + /** * Generate wrapper for record generation function for testing Prepped APIs * @@ -231,6 +266,23 @@ Function2, String, Integer> generateWrapRecordsFn(boolean isP } } + /** + * Generate wrapper for delete key generation function for testing Prepped APIs + * + * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs + * @param writeConfig Hoodie Write Config + * @param wrapped Actual Records Generation function + * @return Wrapped Function + */ + Function2, String, Integer> generateWrapDeleteKeysFn(boolean isPreppedAPI, + HoodieWriteConfig writeConfig, Function2, String, Integer> wrapped) { + if (isPreppedAPI) { + return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped); + } else { + return wrapped; + } + } + /** * Helper to insert first batch of records and do regular assertions on the state after successful completion * @@ -289,6 +341,36 @@ JavaRDD updateBatch(HoodieWriteConfig writeConfig, HoodieWriteClien expTotalCommits); } + /** + * Helper to delete batch of keys and do regular assertions on the state after successful completion + * + * @param writeConfig Hoodie Write Config + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit + * @param initCommitTime Begin Timestamp (usually "000") + * @param numRecordsInThisCommit Number of records to be added in the new commit + * @param deleteFn Delete Function to be used for deletes + * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @param expTotalRecords Expected number of records when scanned + * @return RDD of write-status + * @throws Exception in case of error + */ + JavaRDD deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime, + String prevCommitTime, String initCommitTime, + int numRecordsInThisCommit, + Function3, HoodieWriteClient, JavaRDD, String> deleteFn, boolean isPreppedAPI, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { + final Function2, String, Integer> keyGenFunction = + generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes); + + return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit, + keyGenFunction, + deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords); + } + /** * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion * @@ -360,6 +442,68 @@ JavaRDD writeBatch(HoodieWriteClient client, String newCommitTime, return result; } + /** + * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion + * + * @param client Hoodie Write Client + * @param newCommitTime New Commit Timestamp to be used + * @param prevCommitTime Commit Timestamp used in previous commit + * @param initCommitTime Begin Timestamp (usually "000") + * @param keyGenFunction Key Generation function + * @param deleteFn Write Function to be used for delete + * @param assertForCommit Enable Assertion of Writes + * @param expRecordsInThisCommit Expected number of records in this commit + * @param expTotalRecords Expected number of records when scanned + * @throws Exception in case of error + */ + JavaRDD deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime, + String initCommitTime, int numRecordsInThisCommit, + Function2, String, Integer> keyGenFunction, + Function3, HoodieWriteClient, JavaRDD, String> deleteFn, + boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception { + + // Delete 1 (only deletes) + client.startCommitWithTime(newCommitTime); + + List keysToDelete = keyGenFunction.apply(newCommitTime, numRecordsInThisCommit); + JavaRDD deleteRecords = jsc.parallelize(keysToDelete, 1); + + JavaRDD result = deleteFn.apply(client, deleteRecords, newCommitTime); + List statuses = result.collect(); + assertNoWriteErrors(statuses); + + // check the partition metadata is written out + assertPartitionMetadata(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, fs); + + // verify that there is a commit + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline(); + + if (assertForCommit) { + assertEquals("Expecting 3 commits.", 3, + timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants()); + Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime, + timeline.lastInstant().get().getTimestamp()); + assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit, + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count()); + + // Check the entire dataset has all records still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords, + HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); + + // Check that the incremental consumption from prevCommitTime + assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit," + + " since it is a delete operation", + HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(), + HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count()); + } + return result; + } + /** * Get Cleaner state corresponding to a partition path * diff --git a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java index c2dedffa7d459..724c3fa291cb4 100644 --- a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java @@ -18,6 +18,8 @@ package org.apache.hudi; +import static org.apache.hudi.common.HoodieTestDataGenerator.NULL_SCHEMA; +import static org.apache.hudi.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA; import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -27,6 +29,7 @@ import static org.mockito.Mockito.when; import java.io.FileInputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -274,6 +277,15 @@ private void testUpsertsInternal(HoodieWriteConfig hoodieWriteConfig, updateBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime, Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime, numRecords, writeFn, isPrepped, true, numRecords, 200, 2); + + // Delete 1 + prevCommitTime = newCommitTime; + newCommitTime = "005"; + numRecords = 50; + + deleteBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime, + initCommitTime, numRecords, HoodieWriteClient::delete, isPrepped, true, + 0, 150); } /** @@ -330,7 +342,7 @@ public void testSmallInsertHandlingForUpserts() throws Exception { final int insertSplitLimit = 100; // setup the small file handling params HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max - dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); + dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath}); HoodieWriteClient client = getHoodieWriteClient(config, false); @@ -443,7 +455,7 @@ public void testSmallInsertHandlingForInserts() throws Exception { final int insertSplitLimit = 100; // setup the small file handling params HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max - dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); + dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath}); HoodieWriteClient client = getHoodieWriteClient(config, false); // Inserts => will write file1 @@ -455,7 +467,7 @@ public void testSmallInsertHandlingForInserts() throws Exception { List statuses = client.insert(insertRecordsRDD1, commitTime1).collect(); assertNoWriteErrors(statuses); - assertPartitionMetadata(new String[] {testPartitionPath}, fs); + assertPartitionMetadata(new String[]{testPartitionPath}, fs); assertEquals("Just 1 file needs to be added.", 1, statuses.size()); String file1 = statuses.get(0).getFileId(); @@ -515,6 +527,164 @@ public void testSmallInsertHandlingForInserts() throws Exception { inserts1.size() + inserts2.size() + insert3.size()); } + /** + * Test delete with delete api + */ + @Test + public void testDeletesWithDeleteApi() throws Exception { + final String testPartitionPath = "2016/09/26"; + final int insertSplitLimit = 100; + List keysSoFar = new ArrayList<>(); + // setup the small file handling params + HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max + dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath}); + + HoodieWriteClient client = getHoodieWriteClient(config, false); + + // Inserts => will write file1 + String commitTime1 = "001"; + client.startCommitWithTime(commitTime1); + List inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb + Set keys1 = HoodieClientTestUtils.getRecordKeys(inserts1); + keysSoFar.addAll(keys1); + JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); + List statuses = client.upsert(insertRecordsRDD1, commitTime1).collect(); + + assertNoWriteErrors(statuses); + + assertEquals("Just 1 file needs to be added.", 1, statuses.size()); + String file1 = statuses.get(0).getFileId(); + Assert.assertEquals("file should contain 100 records", + readRowKeysFromParquet(jsc.hadoopConfiguration(), new Path(basePath, statuses.get(0).getStat().getPath())) + .size(), + 100); + + // Delete 20 among 100 inserted + testDeletes(client, inserts1, 20, file1, "002", 80, keysSoFar); + + // Insert and update 40 records + Pair, List> updateBatch2 = testUpdates("003", client, 40, 120); + keysSoFar.addAll(updateBatch2.getLeft()); + + // Delete 10 records among 40 updated + testDeletes(client, updateBatch2.getRight(), 10, file1, "004", 110, keysSoFar); + + // do another batch of updates + Pair, List> updateBatch3 = testUpdates("005", client, 40, 150); + keysSoFar.addAll(updateBatch3.getLeft()); + + // delete non existent keys + String commitTime6 = "006"; + client.startCommitWithTime(commitTime6); + + List dummyInserts3 = dataGen.generateInserts(commitTime6, 20); + List hoodieKeysToDelete3 = HoodieClientTestUtils + .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(dummyInserts3), 20); + JavaRDD deleteKeys3 = jsc.parallelize(hoodieKeysToDelete3, 1); + statuses = client.delete(deleteKeys3, commitTime6).collect(); + assertNoWriteErrors(statuses); + assertEquals("Just 0 write status for delete.", 0, statuses.size()); + + // Check the entire dataset has all records still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals("Must contain " + 150 + " records", 150, + HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); + + // delete another batch. previous delete commit should have persisted the schema. If not, + // this will throw exception + testDeletes(client, updateBatch3.getRight(), 10, file1, "007", 140, keysSoFar); + } + + private Pair, List> testUpdates(String commitTime, HoodieWriteClient client, + int sizeToInsertAndUpdate, int expectedTotalRecords) + throws IOException { + client.startCommitWithTime(commitTime); + List inserts = dataGen.generateInserts(commitTime, sizeToInsertAndUpdate); + Set keys = HoodieClientTestUtils.getRecordKeys(inserts); + List insertsAndUpdates = new ArrayList<>(); + insertsAndUpdates.addAll(inserts); + insertsAndUpdates.addAll(dataGen.generateUpdates(commitTime, inserts)); + + JavaRDD insertAndUpdatesRDD = jsc.parallelize(insertsAndUpdates, 1); + List statuses = client.upsert(insertAndUpdatesRDD, commitTime).collect(); + assertNoWriteErrors(statuses); + + // Check the entire dataset has all records still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals("Must contain " + expectedTotalRecords + " records", expectedTotalRecords, + HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); + return Pair.of(keys, inserts); + } + + private void testDeletes(HoodieWriteClient client, List previousRecords, int sizeToDelete, + String existingFile, String commitTime, int exepctedRecords, List keys) { + client.startCommitWithTime(commitTime); + + List hoodieKeysToDelete = HoodieClientTestUtils + .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(previousRecords), sizeToDelete); + JavaRDD deleteKeys = jsc.parallelize(hoodieKeysToDelete, 1); + List statuses = client.delete(deleteKeys, commitTime).collect(); + + assertNoWriteErrors(statuses); + + assertEquals("Just 1 file needs to be added.", 1, statuses.size()); + assertEquals("Existing file should be expanded", existingFile, statuses.get(0).getFileId()); + + // Check the entire dataset has all records still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals("Must contain " + exepctedRecords + " records", exepctedRecords, + HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count()); + + Path newFile = new Path(basePath, statuses.get(0).getStat().getPath()); + assertEquals("file should contain 110 records", readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(), + exepctedRecords); + + List records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), newFile); + for (GenericRecord record : records) { + String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(); + assertTrue("key expected to be part of " + commitTime, keys.contains(recordKey)); + assertFalse("Key deleted", hoodieKeysToDelete.contains(recordKey)); + } + } + + /** + * Test delete with delete api + */ + @Test + public void testDeletesWithoutInserts() throws Exception { + final String testPartitionPath = "2016/09/26"; + final int insertSplitLimit = 100; + // setup the small file handling params + HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, true); // hold upto 200 records max + dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath}); + + HoodieWriteClient client = getHoodieWriteClient(config, false); + + // delete non existent keys + String commitTime1 = "001"; + client.startCommitWithTime(commitTime1); + + List dummyInserts = dataGen.generateInserts(commitTime1, 20); + List hoodieKeysToDelete = HoodieClientTestUtils + .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(dummyInserts), 20); + JavaRDD deleteKeys = jsc.parallelize(hoodieKeysToDelete, 1); + try { + client.delete(deleteKeys, commitTime1).collect(); + fail("Should have thrown Exception"); + } catch (HoodieIOException e) { + // ignore + } + } + /** * Test to ensure commit metadata points to valid files */ @@ -710,7 +880,14 @@ private Pair> testConsistencyCheck(HoodieTableMetaCli * Build Hoodie Write Config for small data file sizes */ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize) { - HoodieWriteConfig.Builder builder = getConfigBuilder(); + return getSmallInsertWriteConfig(insertSplitSize, false); + } + + /** + * Build Hoodie Write Config for small data file sizes + */ + private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema) { + HoodieWriteConfig.Builder builder = getConfigBuilder(useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA); return builder .withCompactionConfig( HoodieCompactionConfig.newBuilder().compactionSmallFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 15) diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java index 7890f3a760864..32aa5942c6aa9 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -39,6 +40,7 @@ import org.apache.hudi.avro.HoodieAvroWriteSupport; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieDataFile; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTestUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -69,6 +71,7 @@ public class HoodieClientTestUtils { private static final transient Logger log = LogManager.getLogger(HoodieClientTestUtils.class); + private static final Random RANDOM = new Random(); public static List collectStatuses(Iterator> statusListItr) { List statuses = new ArrayList<>(); @@ -86,6 +89,27 @@ public static Set getRecordKeys(List hoodieRecords) { return keys; } + public static List getHoodieKeys(List hoodieRecords) { + List keys = new ArrayList<>(); + for (HoodieRecord rec : hoodieRecords) { + keys.add(rec.getKey()); + } + return keys; + } + + public static List getKeysToDelete(List keys, int size) { + List toReturn = new ArrayList<>(); + int counter = 0; + while (counter < size) { + int index = RANDOM.nextInt(keys.size()); + if (!toReturn.contains(keys.get(index))) { + toReturn.add(keys.get(index)); + counter++; + } + } + return toReturn; + } + private static void fakeMetaFile(String basePath, String commitTime, String suffix) throws IOException { String parentPath = basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME; new File(parentPath).mkdirs(); diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java index 68be7d33bca90..52aae1be7daf1 100644 --- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java +++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java @@ -78,6 +78,7 @@ public class HoodieTestDataGenerator { + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"}," + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"}," + "{\"name\":\"fare\",\"type\": \"double\"}]}"; + public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString(); public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double"; public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA); public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema); @@ -302,7 +303,8 @@ public List generateUpdates(String commitTime, List } /** - * Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned list + * Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned + * list * * @param commitTime Commit Timestamp * @param n Number of updates (including dups) @@ -329,6 +331,17 @@ public List generateUniqueUpdates(String commitTime, Integer n) { return generateUniqueUpdatesStream(commitTime, n).collect(Collectors.toList()); } + /** + * Generates deduped delete of keys previously inserted, randomly distributed across the keys above. + * + * @param commitTime Commit Timestamp + * @param n Number of unique records + * @return list of hoodie record updates + */ + public List generateUniqueDeletes(String commitTime, Integer n) { + return generateUniqueDeleteStream(commitTime, n).collect(Collectors.toList()); + } + /** * Generates deduped updates of keys previously inserted, randomly distributed across the keys above. * @@ -360,6 +373,33 @@ public Stream generateUniqueUpdatesStream(String commitTime, Integ }); } + /** + * Generates deduped delete of keys previously inserted, randomly distributed across the keys above. + * + * @param commitTime Commit Timestamp + * @param n Number of unique records + * @return stream of hoodie record updates + */ + public Stream generateUniqueDeleteStream(String commitTime, Integer n) { + final Set used = new HashSet<>(); + + if (n > numExistingKeys) { + throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys"); + } + + return IntStream.range(0, n).boxed().map(i -> { + int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1); + KeyPartition kp = existingKeys.get(index); + // Find the available keyPartition starting from randomly chosen one. + while (used.contains(kp)) { + index = (index + 1) % numExistingKeys; + kp = existingKeys.get(index); + } + used.add(kp); + return kp.key; + }); + } + public String[] getPartitionPaths() { return partitionPaths; } @@ -369,6 +409,7 @@ public int getNumExistingKeys() { } public static class KeyPartition implements Serializable { + HoodieKey key; String partitionPath; } diff --git a/hudi-spark/src/main/java/org/apache/hudi/EmptyHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java similarity index 93% rename from hudi-spark/src/main/java/org/apache/hudi/EmptyHoodieRecordPayload.java rename to hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java index ddcbeb74e4ca0..b9e122a1dacc4 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/EmptyHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java @@ -16,12 +16,11 @@ * limitations under the License. */ -package org.apache.hudi; +package org.apache.hudi.common.model; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.Option; /** @@ -29,7 +28,11 @@ */ public class EmptyHoodieRecordPayload implements HoodieRecordPayload { - public EmptyHoodieRecordPayload(GenericRecord record, Comparable orderingVal) {} + public EmptyHoodieRecordPayload() { + } + + public EmptyHoodieRecordPayload(GenericRecord record, Comparable orderingVal) { + } @Override public EmptyHoodieRecordPayload preCombine(EmptyHoodieRecordPayload another) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index 5eb8ce46ff9e2..fb70b37d6a8dc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -40,6 +40,7 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class HoodieCommitMetadata implements Serializable { + public static final String SCHEMA_KEY = "schema"; private static volatile Logger log = LogManager.getLogger(HoodieCommitMetadata.class); protected Map> partitionToWriteStats; protected Boolean compacted; diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java index 55b64db8d06fc..7e73460653d48 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java @@ -147,15 +147,15 @@ public void testRunHoodieJavaApp(String hiveTableName, String tableType, Partiti stdOutErr = executeHiveCommand("show tables like '" + hiveTableName + "'"); Assert.assertEquals("Table exists", hiveTableName, stdOutErr.getLeft()); - // Ensure row count is 100 (without duplicates) + // Ensure row count is 80 (without duplicates) (100 - 20 deleted) stdOutErr = executeHiveCommand("select count(1) from " + hiveTableName); - Assert.assertEquals("Expecting 100 rows to be present in the new table", 100, + Assert.assertEquals("Expecting 100 rows to be present in the new table", 80, Integer.parseInt(stdOutErr.getLeft().trim())); - // If is MOR table, ensure realtime table row count is 100 (without duplicates) + // If is MOR table, ensure realtime table row count is 100 - 20 = 80 (without duplicates) if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { stdOutErr = executeHiveCommand("select count(1) from " + hiveTableName + "_rt"); - Assert.assertEquals("Expecting 100 rows to be present in the realtime table,", 100, + Assert.assertEquals("Expecting 100 rows to be present in the realtime table,", 80, Integer.parseInt(stdOutErr.getLeft().trim())); } @@ -167,7 +167,7 @@ public void testRunHoodieJavaApp(String hiveTableName, String tableType, Partiti // Run the count query again. Without Hoodie, all versions are included. So we get a wrong count stdOutErr = executeHiveCommand("select count(1) from " + hiveTableName); - Assert.assertEquals("Expecting 200 rows to be present in the new table", 200, + Assert.assertEquals("Expecting 280 rows to be present in the new table", 280, Integer.parseInt(stdOutErr.getLeft().trim())); } diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java index 9ce79e90fbb74..90128b8b5bb1f 100644 --- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -92,9 +92,9 @@ public static Object getNestedFieldVal(GenericRecord record, String fieldName) { /** * Create a key generator class via reflection, passing in any configs needed. * - * If the class name of key generator is configured through the properties file, i.e., {@code - * props}, use the corresponding key generator class; otherwise, use the default key generator class specified in - * {@code DataSourceWriteOptions}. + * If the class name of key generator is configured through the properties file, i.e., {@code props}, use the + * corresponding key generator class; otherwise, use the default key generator class specified in {@code + * DataSourceWriteOptions}. */ public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException { String keyGeneratorClass = props.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(), @@ -124,7 +124,7 @@ public static HoodieRecordPayload createPayload(String payloadClass, GenericReco throws IOException { try { return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass, - new Class[] {GenericRecord.class, Comparable.class}, record, orderingVal); + new Class[]{GenericRecord.class, Comparable.class}, record, orderingVal); } catch (Throwable e) { throw new IOException("Could not create payload for class: " + payloadClass, e); } @@ -172,6 +172,11 @@ public static JavaRDD doWriteOperation(HoodieWriteClient client, Ja } } + public static JavaRDD doDeleteOperation(HoodieWriteClient client, JavaRDD hoodieKeys, + String commitTime) { + return client.delete(hoodieKeys, commitTime); + } + public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey, String payloadClass) throws IOException { HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal); diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index e857b2569b117..3a97be1a6d8e4 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -20,6 +20,7 @@ package org.apache.hudi import com.databricks.spark.avro.SchemaConverters import org.apache.avro.generic.GenericRecord import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.hudi.common.model.HoodieKey import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types._ @@ -41,6 +42,10 @@ object AvroConversionUtils { } } + def createRddForDeletes(df: DataFrame, rowField: String, partitionField: String): RDD[HoodieKey] = { + df.rdd.map(row => (new HoodieKey(row.getAs[String](rowField), row.getAs[String](partitionField)))) + } + def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss: SparkSession): Dataset[Row] = { if (rdd.isEmpty()) { ss.emptyDataFrame diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala index bbcc98c87b3e6..81d13ada8a379 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -66,8 +66,8 @@ object DataSourceReadOptions { /** * For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies opaque map functions, - * filters appearing late in the sequence of transformations cannot be automatically pushed down. - * This option allows setting filters directly on Hoodie Source + * filters appearing late in the sequence of transformations cannot be automatically pushed down. + * This option allows setting filters directly on Hoodie Source */ val PUSH_DOWN_INCR_FILTERS_OPT_KEY = "hoodie.datasource.read.incr.filters" } @@ -85,6 +85,7 @@ object DataSourceWriteOptions { val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert" val INSERT_OPERATION_OPT_VAL = "insert" val UPSERT_OPERATION_OPT_VAL = "upsert" + val DELETE_OPERATION_OPT_VAL = "delete" val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL /** @@ -152,31 +153,31 @@ object DataSourceWriteOptions { val DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL = "_" /** - * Flag to indicate whether to drop duplicates upon insert. - * By default insert will accept duplicates, to gain extra performance. - */ + * Flag to indicate whether to drop duplicates upon insert. + * By default insert will accept duplicates, to gain extra performance. + */ val INSERT_DROP_DUPS_OPT_KEY = "hoodie.datasource.write.insert.drop.duplicates" val DEFAULT_INSERT_DROP_DUPS_OPT_VAL = "false" /** - * Flag to indicate how many times streaming job should retry for a failed microbatch - * By default 3 - */ + * Flag to indicate how many times streaming job should retry for a failed microbatch + * By default 3 + */ val STREAMING_RETRY_CNT_OPT_KEY = "hoodie.datasource.write.streaming.retry.count" val DEFAULT_STREAMING_RETRY_CNT_OPT_VAL = "3" /** - * Flag to indicate how long (by millisecond) before a retry should issued for failed microbatch - * By default 2000 and it will be doubled by every retry - */ + * Flag to indicate how long (by millisecond) before a retry should issued for failed microbatch + * By default 2000 and it will be doubled by every retry + */ val STREAMING_RETRY_INTERVAL_MS_OPT_KEY = "hoodie.datasource.write.streaming.retry.interval.ms" val DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL = "2000" /** - * Flag to indicate whether to ignore any non exception error (e.g. writestatus error) - * within a streaming microbatch - * By default true (in favor of streaming progressing over data integrity) - */ + * Flag to indicate whether to ignore any non exception error (e.g. writestatus error) + * within a streaming microbatch + * By default true (in favor of streaming progressing over data integrity) + */ val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY = "hoodie.datasource.write.streaming.ignore.failed.batch" val DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL = "true" diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f8bd2df7b6d63..eeb40ca693691 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -19,6 +19,7 @@ package org.apache.hudi import java.util +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.conf.HiveConf @@ -29,7 +30,7 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} import org.apache.log4j.LogManager -import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} @@ -72,131 +73,215 @@ private[hudi] object HoodieSparkSqlWriter { parameters(OPERATION_OPT_KEY) } - // register classes & schemas - val structName = s"${tblName.get}_record" - val nameSpace = s"hoodie.${tblName.get}" - sparkContext.getConf.registerKryoClasses( - Array(classOf[org.apache.avro.generic.GenericData], - classOf[org.apache.avro.Schema])) - val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) - sparkContext.getConf.registerAvroSchemas(schema) - log.info(s"Registered avro schema : ${schema.toString(true)}") - - // Convert to RDD[HoodieRecord] - val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) - val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) - val hoodieAllIncomingRecords = genericRecords.map(gr => { - val orderingVal = DataSourceUtils.getNestedFieldValAsString( - gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]] - DataSourceUtils.createHoodieRecord(gr, - orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY)) - }).toJavaRDD() + var writeSuccessful: Boolean = false + var commitTime: String = null + var writeStatuses: JavaRDD[WriteStatus] = null val jsc = new JavaSparkContext(sparkContext) - val basePath = new Path(parameters("path")) val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) - // Handle various save modes - if (mode == SaveMode.ErrorIfExists && exists) { - throw new HoodieException(s"hoodie dataset at $basePath already exists.") - } - if (mode == SaveMode.Ignore && exists) { - log.warn(s"hoodie dataset at $basePath already exists. Ignoring & not performing actual writes.") - return (true, common.util.Option.empty()) - } - if (mode == SaveMode.Overwrite && exists) { - log.warn(s"hoodie dataset at $basePath already exists. Deleting existing data & overwriting with new data.") - fs.delete(basePath, true) - exists = false - } + // Running into issues wrt generic type conversion from Java to Scala. Couldn't make common code paths for + // write and deletes. Specifically, instantiating client of type HoodieWriteClient + // is having issues. Hence some codes blocks are same in both if and else blocks. + if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) { + // register classes & schemas + val structName = s"${tblName.get}_record" + val nameSpace = s"hoodie.${tblName.get}" + sparkContext.getConf.registerKryoClasses( + Array(classOf[org.apache.avro.generic.GenericData], + classOf[org.apache.avro.Schema])) + val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) + sparkContext.getConf.registerAvroSchemas(schema) + log.info(s"Registered avro schema : ${schema.toString(true)}") - // Create the dataset if not present - if (!exists) { - HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType, - tblName.get, "archived") - } + // Convert to RDD[HoodieRecord] + val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) + val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) + val hoodieAllIncomingRecords = genericRecords.map(gr => { + val orderingVal = DataSourceUtils.getNestedFieldValAsString( + gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]] + DataSourceUtils.createHoodieRecord(gr, + orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY)) + }).toJavaRDD() - // Create a HoodieWriteClient & issue the write. - val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get, - mapAsJavaMap(parameters) - ) - - val hoodieRecords = - if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) { - DataSourceUtils.dropDuplicates( - jsc, - hoodieAllIncomingRecords, - mapAsJavaMap(parameters), client.getTimelineServer) - } else { - hoodieAllIncomingRecords + // Handle various save modes + if (mode == SaveMode.ErrorIfExists && exists) { + throw new HoodieException(s"hoodie dataset at $basePath already exists.") } - - if (hoodieRecords.isEmpty()) { - log.info("new batch has no new records, skipping...") - return (true, common.util.Option.empty()) - } - - val commitTime = client.startCommit() - - val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation) - // Check for errors and commit the write. - val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() - val writeSuccessful = - if (errorCount == 0) { - log.info("No errors. Proceeding to commit the write.") - val metaMap = parameters.filter(kv => - kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) - val commitSuccess = if (metaMap.isEmpty) { - client.commit(commitTime, writeStatuses) - } else { - client.commit(commitTime, writeStatuses, - common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) + if (mode == SaveMode.Ignore && exists) { + log.warn(s"hoodie dataset at $basePath already exists. Ignoring & not performing actual writes.") + return (true, common.util.Option.empty()) } - - if (commitSuccess) { - log.info("Commit " + commitTime + " successful!") + if (mode == SaveMode.Overwrite && exists) { + log.warn(s"hoodie dataset at $basePath already exists. Deleting existing data & overwriting with new data.") + fs.delete(basePath, true) + exists = false } - else { - log.info("Commit " + commitTime + " failed!") + + // Create the dataset if not present + if (!exists) { + HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType, + tblName.get, "archived") } - val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) - val syncHiveSucess = if (hiveSyncEnabled) { - log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") - val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration) - syncHive(basePath, fs, parameters) - } else { - true + // Create a HoodieWriteClient & issue the write. + val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get, + mapAsJavaMap(parameters) + ) + + val hoodieRecords = + if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) { + DataSourceUtils.dropDuplicates( + jsc, + hoodieAllIncomingRecords, + mapAsJavaMap(parameters), client.getTimelineServer) + } else { + hoodieAllIncomingRecords + } + + if (hoodieRecords.isEmpty()) { + log.info("new batch has no new records, skipping...") + return (true, common.util.Option.empty()) } - client.close() - commitSuccess && syncHiveSucess + commitTime = client.startCommit() + writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation) + // Check for errors and commit the write. + val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() + writeSuccessful = + if (errorCount == 0) { + log.info("No errors. Proceeding to commit the write.") + val metaMap = parameters.filter(kv => + kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) + val commitSuccess = if (metaMap.isEmpty) { + client.commit(commitTime, writeStatuses) + } else { + client.commit(commitTime, writeStatuses, + common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) + } + + if (commitSuccess) { + log.info("Commit " + commitTime + " successful!") + } + else { + log.info("Commit " + commitTime + " failed!") + } + + val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) + val syncHiveSucess = if (hiveSyncEnabled) { + log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") + val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration) + syncHive(basePath, fs, parameters) + } else { + true + } + client.close() + commitSuccess && syncHiveSucess + } else { + log.error(s"$operation failed with ${errorCount} errors :"); + if (log.isTraceEnabled) { + log.trace("Printing out the top 100 errors") + writeStatuses.rdd.filter(ws => ws.hasErrors) + .take(100) + .foreach(ws => { + log.trace("Global error :", ws.getGlobalError) + if (ws.getErrors.size() > 0) { + ws.getErrors.foreach(kt => + log.trace(s"Error for key: ${kt._1}", kt._2)) + } + }) + } + false + } } else { - log.error(s"$operation failed with ${errorCount} errors :"); - if (log.isTraceEnabled) { - log.trace("Printing out the top 100 errors") - writeStatuses.rdd.filter(ws => ws.hasErrors) - .take(100) - .foreach(ws => { - log.trace("Global error :", ws.getGlobalError) - if (ws.getErrors.size() > 0) { - ws.getErrors.foreach(kt => - log.trace(s"Error for key: ${kt._1}", kt._2)) - } - }) + + // Handle save modes + if (mode != SaveMode.Append) { + throw new HoodieException(s"Append is the only save mode applicable for $operation operation") } - false + + val structName = s"${tblName.get}_record" + val nameSpace = s"hoodie.${tblName.get}" + sparkContext.getConf.registerKryoClasses( + Array(classOf[org.apache.avro.generic.GenericData], + classOf[org.apache.avro.Schema])) + + // Convert to RDD[HoodieKey] + val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) + val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace) + val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD() + + if (!exists) { + throw new HoodieException(s"hoodie dataset at $basePath does not exist") + } + + // Create a HoodieWriteClient & issue the delete. + val client = DataSourceUtils.createHoodieClient(jsc, + Schema.create(Schema.Type.NULL).toString, path.get, tblName.get, + mapAsJavaMap(parameters) + ) + + // Issue deletes + commitTime = client.startCommit() + writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime) + val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count() + writeSuccessful = + if (errorCount == 0) { + log.info("No errors. Proceeding to commit the write.") + val metaMap = parameters.filter(kv => + kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY))) + val commitSuccess = if (metaMap.isEmpty) { + client.commit(commitTime, writeStatuses) + } else { + client.commit(commitTime, writeStatuses, + common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap)))) + } + + if (commitSuccess) { + log.info("Commit " + commitTime + " successful!") + } + else { + log.info("Commit " + commitTime + " failed!") + } + + val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) + val syncHiveSucess = if (hiveSyncEnabled) { + log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") + val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration) + syncHive(basePath, fs, parameters) + } else { + true + } + client.close() + commitSuccess && syncHiveSucess + } else { + log.error(s"$operation failed with ${errorCount} errors :"); + if (log.isTraceEnabled) { + log.trace("Printing out the top 100 errors") + writeStatuses.rdd.filter(ws => ws.hasErrors) + .take(100) + .foreach(ws => { + log.trace("Global error :", ws.getGlobalError) + if (ws.getErrors.size() > 0) { + ws.getErrors.foreach(kt => + log.trace(s"Error for key: ${kt._1}", kt._2)) + } + }) + } + false + } } + (writeSuccessful, common.util.Option.ofNullable(commitTime)) } /** - * Add default options for unspecified write options keys. - * - * @param parameters - * @return - */ + * Add default options for unspecified write options keys. + * + * @param parameters + * @return + */ def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = { Map(OPERATION_OPT_KEY -> DEFAULT_OPERATION_OPT_VAL, STORAGE_TYPE_OPT_KEY -> DEFAULT_STORAGE_TYPE_OPT_VAL, diff --git a/hudi-spark/src/test/java/DataSourceTestUtils.java b/hudi-spark/src/test/java/DataSourceTestUtils.java index 9f85b5391496b..15fea332eae2b 100644 --- a/hudi-spark/src/test/java/DataSourceTestUtils.java +++ b/hudi-spark/src/test/java/DataSourceTestUtils.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.stream.Collectors; import org.apache.hudi.common.TestRawTripPayload; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; @@ -42,4 +43,10 @@ public static List convertToStringList(List records) { return records.stream().map(hr -> convertToString(hr)).filter(os -> os.isPresent()).map(os -> os.get()) .collect(Collectors.toList()); } + + public static List convertKeysToStringList(List keys) { + return keys.stream() + .map(hr -> "{\"_row_key\":\"" + hr.getRecordKey() + "\",\"partition\":\"" + hr.getPartitionPath() + "\"}") + .collect(Collectors.toList()); + } } diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark/src/test/java/HoodieJavaApp.java index 32ae31bf67890..50ac65ca42107 100644 --- a/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark/src/test/java/HoodieJavaApp.java @@ -18,6 +18,7 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.DataSourceReadOptions; @@ -25,7 +26,9 @@ import org.apache.hudi.HoodieDataSourceHelpers; import org.apache.hudi.NonpartitionedKeyGenerator; import org.apache.hudi.SimpleKeyGenerator; +import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.hive.MultiPartKeysValueExtractor; @@ -105,16 +108,18 @@ public void run() throws Exception { HoodieTestDataGenerator dataGen = null; if (nonPartitionedTable) { // All data goes to base-path - dataGen = new HoodieTestDataGenerator(new String[] {""}); + dataGen = new HoodieTestDataGenerator(new String[]{""}); } else { dataGen = new HoodieTestDataGenerator(); } + List recordsSoFar = new ArrayList<>(); /** * Commit with only inserts */ // Generate some input.. - List records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("001"/* ignore */, 100)); + recordsSoFar.addAll(dataGen.generateInserts("001"/* ignore */, 100)); + List records1 = DataSourceTestUtils.convertToStringList(recordsSoFar); Dataset inputDF1 = spark.read().json(jssc.parallelize(records1, 2)); // Save as hoodie dataset (copy on write) @@ -152,7 +157,9 @@ public void run() throws Exception { /** * Commit that updates records */ - List records2 = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("002"/* ignore */, 100)); + List recordsToBeUpdated = dataGen.generateUpdates("002"/* ignore */, 100); + recordsSoFar.addAll(recordsToBeUpdated); + List records2 = DataSourceTestUtils.convertToStringList(recordsToBeUpdated); Dataset inputDF2 = spark.read().json(jssc.parallelize(records2, 2)); writer = inputDF2.write().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") @@ -168,7 +175,31 @@ public void run() throws Exception { updateHiveSyncConfig(writer); writer.save(tablePath); String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); - logger.info("Second commit at instant time :" + commitInstantTime1); + logger.info("Second commit at instant time :" + commitInstantTime2); + + /** + * Commit that Deletes some records + */ + List deletes = DataSourceTestUtils.convertKeysToStringList( + HoodieClientTestUtils + .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(recordsSoFar), 20)); + Dataset inputDF3 = spark.read().json(jssc.parallelize(deletes, 2)); + writer = inputDF3.write().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2") + .option("hoodie.upsert.shuffle.parallelism", "2") + .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), tableType) // Hoodie Table Type + .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), "delete") + .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "_row_key") + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(), + nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName() + : SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor + .option(HoodieWriteConfig.TABLE_NAME, tableName).mode(SaveMode.Append); + + updateHiveSyncConfig(writer); + writer.save(tablePath); + String commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, tablePath); + logger.info("Third commit at instant time :" + commitInstantTime3); /** * Read & do some queries @@ -200,9 +231,6 @@ public void run() throws Exception { /** * Setup configs for syncing to hive - * - * @param writer - * @return */ private DataFrameWriter updateHiveSyncConfig(DataFrameWriter writer) { if (enableHiveSync) { diff --git a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala index 1f6cd7fe3cac0..a2a1804d7c2af 100644 --- a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala +++ b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala @@ -16,9 +16,10 @@ */ import org.apache.avro.generic.GenericRecord +import org.apache.hudi.common.model.EmptyHoodieRecordPayload import org.apache.hudi.common.util.{Option, SchemaTestUtil, TypedProperties} import org.apache.hudi.exception.HoodieException -import org.apache.hudi.{ComplexKeyGenerator, DataSourceWriteOptions, EmptyHoodieRecordPayload, OverwriteWithLatestAvroPayload, SimpleKeyGenerator} +import org.apache.hudi.{ComplexKeyGenerator, DataSourceWriteOptions, OverwriteWithLatestAvroPayload, SimpleKeyGenerator} import org.junit.Assert._ import org.junit.{Before, Test} import org.scalatest.junit.AssertionsForJUnit