Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions docker/demo/config/test-suite/cow-spark-long-running.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ dag_content:
num_records_insert: 10000
type: SparkInsertNode
deps: none
first_validate:
config:
validate_hive: false
type: ValidateDatasetNode
deps: first_insert
first_upsert:
config:
record_size: 200
Expand All @@ -39,7 +34,7 @@ dag_content:
num_records_upsert: 3000
num_partitions_upsert: 50
type: SparkUpsertNode
deps: first_validate
deps: first_insert
first_delete:
config:
num_partitions_delete: 50
Expand All @@ -48,6 +43,7 @@ dag_content:
deps: first_upsert
second_validate:
config:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ dag_content:
engine: "mr"
type: HiveSyncNode
deps: third_insert
first_validate:
config:
validate_hive: false
type: ValidateDatasetNode
deps: first_hive_sync
first_upsert:
config:
record_size: 1000
Expand All @@ -61,7 +56,7 @@ dag_content:
num_records_upsert: 100
num_partitions_upsert: 1
type: UpsertNode
deps: first_validate
deps: first_hive_sync
first_delete:
config:
num_partitions_delete: 50
Expand All @@ -76,6 +71,7 @@ dag_content:
deps: first_delete
second_validate:
config:
validate_once_every_itr : 5
validate_hive: true
delete_input_data: true
type: ValidateDatasetNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ dag_content:
deps: first_upsert
second_validate:
config:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ dag_content:
deps: first_upsert
second_validate:
config:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ dag_content:
deps: first_upsert
second_validate:
config:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: false
type: ValidateDatasetNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ dag_content:
num_records_insert: 300
deps: second_insert
type: InsertNode
first_validate:
config:
validate_hive: false
type: ValidateDatasetNode
deps: third_insert
first_upsert:
config:
record_size: 1000
Expand All @@ -55,7 +50,7 @@ dag_content:
num_records_upsert: 100
num_partitions_upsert: 1
type: UpsertNode
deps: first_validate
deps: third_insert
first_delete:
config:
num_partitions_delete: 1
Expand All @@ -64,6 +59,7 @@ dag_content:
deps: first_upsert
second_validate:
config:
validate_once_every_itr : 5
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public static class Config {
private static String START_PARTITION = "start_partition";
private static String DELETE_INPUT_DATA = "delete_input_data";
private static String VALIDATE_HIVE = "validate_hive";
private static String VALIDATE_ONCE_EVERY_ITR = "validate_once_every_itr";
private static String EXECUTE_ITR_COUNT = "execute_itr_count";
private static String VALIDATE_ARCHIVAL = "validate_archival";
private static String VALIDATE_CLEAN = "validate_clean";
Expand Down Expand Up @@ -216,6 +217,10 @@ public boolean isValidateHive() {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, false).toString());
}

public int validateOnceEveryIteration() {
return Integer.valueOf(configsMap.getOrDefault(VALIDATE_ONCE_EVERY_ITR, 1).toString());
}

public boolean isValidateFullData() {
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA, false).toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,81 +74,84 @@ public abstract Dataset<Row> getDatasetToValidate(SparkSession session, Executio

@Override
public void execute(ExecutionContext context, int curItrCount) throws Exception {

SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();

// todo: Fix partitioning schemes. For now, assumes data based partitioning.
String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
log.warn("Validation using data from input path " + inputPath);
// listing batches to be validated
String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
if (log.isDebugEnabled()) {
FileSystem fs = new Path(inputPathStr)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
log.info("fileStatuses length: " + fileStatuses.length);
for (FileStatus fileStatus : fileStatuses) {
log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString());
}
}

Dataset<Row> inputSnapshotDf = getInputDf(context, session, inputPath);

// read from hudi and remove meta columns.
Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context, inputSnapshotDf.schema());
if (config.isValidateFullData()) {
log.debug("Validating full dataset");
Dataset<Row> exceptInputDf = inputSnapshotDf.except(trimmedHudiDf);
Dataset<Row> exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf);
long exceptInputCount = exceptInputDf.count();
long exceptHudiCount = exceptHudiDf.count();
log.debug("Except input df count " + exceptInputDf + ", except hudi count " + exceptHudiCount);
if (exceptInputCount != 0 || exceptHudiCount != 0) {
log.error("Data set validation failed. Total count in hudi " + trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count()
+ ". InputDf except hudi df = " + exceptInputCount + ", Hudi df except Input df " + exceptHudiCount);
throw new AssertionError("Hudi contents does not match contents input data. ");
}
} else {
Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
long inputCount = inputSnapshotDf.count();
long outputCount = trimmedHudiDf.count();
log.debug("Input count: " + inputCount + "; output count: " + outputCount);
// the intersected df should be same as inputDf. if not, there is some mismatch.
if (outputCount == 0 || inputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) {
log.error("Data set validation failed. Total count in hudi " + outputCount + ", input df count " + inputCount);
throw new AssertionError("Hudi contents does not match contents input data. ");
int validateOnceEveryItr = config.validateOnceEveryIteration();
int itrCountToExecute = config.getIterationCountToExecute();
if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) ||
(itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) {
SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
// todo: Fix partitioning schemes. For now, assumes data based partitioning.
String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
log.warn("Validation using data from input path " + inputPath);
// listing batches to be validated
String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
if (log.isDebugEnabled()) {
FileSystem fs = new Path(inputPathStr)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
log.info("fileStatuses length: " + fileStatuses.length);
for (FileStatus fileStatus : fileStatuses) {
log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString());
}
}

if (config.isValidateHive()) {
String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
session.sql("REFRESH TABLE " + database + "." + tableName);
Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
"test_suite_source_ordering_field FROM " + database + "." + tableName);
Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
"_hoodie_is_deleted", "test_suite_source_ordering_field");

Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
outputCount = trimmedHudiDf.count();
log.warn("Input count: " + inputCount + "; output count: " + outputCount);
Dataset<Row> inputSnapshotDf = getInputDf(context, session, inputPath);

// read from hudi and remove meta columns.
Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context, inputSnapshotDf.schema());
if (config.isValidateFullData()) {
log.debug("Validating full dataset");
Dataset<Row> exceptInputDf = inputSnapshotDf.except(trimmedHudiDf);
Dataset<Row> exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf);
long exceptInputCount = exceptInputDf.count();
long exceptHudiCount = exceptHudiDf.count();
log.debug("Except input df count " + exceptInputDf + ", except hudi count " + exceptHudiCount);
if (exceptInputCount != 0 || exceptHudiCount != 0) {
log.error("Data set validation failed. Total count in hudi " + trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count()
+ ". InputDf except hudi df = " + exceptInputCount + ", Hudi df except Input df " + exceptHudiCount);
throw new AssertionError("Hudi contents does not match contents input data. ");
}
} else {
Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
long inputCount = inputSnapshotDf.count();
long outputCount = trimmedHudiDf.count();
log.debug("Input count: " + inputCount + "; output count: " + outputCount);
// the intersected df should be same as inputDf. if not, there is some mismatch.
if (outputCount == 0 || reorderedInputDf.except(intersectedHiveDf).count() != 0) {
log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount);
throw new AssertionError("Hudi hive table contents does not match contents input data. ");
if (outputCount == 0 || inputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) {
log.error("Data set validation failed. Total count in hudi " + outputCount + ", input df count " + inputCount);
throw new AssertionError("Hudi contents does not match contents input data. ");
}
}

// if delete input data is enabled, erase input data.
if (config.isDeleteInputData()) {
// clean up input data for current group of writes.
inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
FileSystem fs = new Path(inputPathStr)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
for (FileStatus fileStatus : fileStatuses) {
log.debug("Micro batch to be deleted " + fileStatus.getPath().toString());
fs.delete(fileStatus.getPath(), true);
if (config.isValidateHive()) {
String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
session.sql("REFRESH TABLE " + database + "." + tableName);
Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
"test_suite_source_ordering_field FROM " + database + "." + tableName);
Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
"_hoodie_is_deleted", "test_suite_source_ordering_field");

Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
outputCount = trimmedHudiDf.count();
log.warn("Input count: " + inputCount + "; output count: " + outputCount);
// the intersected df should be same as inputDf. if not, there is some mismatch.
if (outputCount == 0 || reorderedInputDf.except(intersectedHiveDf).count() != 0) {
log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount);
throw new AssertionError("Hudi hive table contents does not match contents input data. ");
}
}

// if delete input data is enabled, erase input data.
if (config.isDeleteInputData()) {
// clean up input data for current group of writes.
inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
FileSystem fs = new Path(inputPathStr)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
for (FileStatus fileStatus : fileStatuses) {
log.debug("Micro batch to be deleted " + fileStatus.getPath().toString());
fs.delete(fileStatus.getPath(), true);
}
}
}
}
Expand Down