diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java new file mode 100644 index 0000000000000..c4f782fe40864 --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngestTool.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.integ.testsuite; + +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.integ.testsuite.SparkDataSourceContinuousIngest; +import org.apache.hudi.utilities.HoodieRepairTool; +import org.apache.hudi.utilities.IdentitySplitter; +import org.apache.hudi.utilities.UtilHelpers; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * Sample command + * + * ./bin/spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.4 --driver-memory 4g --executor-memory 4g \ + * --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.catalogImplementation=hive \ + * --class org.apache.hudi.integ.testsuite.SparkDSContinuousIngestTool \ + * ${HUDI_ROOT_DIR}/packaging/hudi-integ-test-bundle/target/hudi-integ-test-bundle-0.11.0-SNAPSHOT.jar \ + * --source-path file:${SOURCE_DIR}/spark_ds_continuous --checkpoint-file-path /tmp/hudi/checkpoint \ + * --base-path file:///tmp/hudi/tbl_path/ --props /tmp/hudi_props.out + * + * Contents of hudi.properties + * + * hoodie.insert.shuffle.parallelism=4 + * hoodie.upsert.shuffle.parallelism=4 + * hoodie.bulkinsert.shuffle.parallelism=4 + * hoodie.delete.shuffle.parallelism=4 + * hoodie.datasource.write.recordkey.field=VendorID + * hoodie.datasource.write.partitionpath.field=date_col + * hoodie.datasource.write.operation=upsert + * hoodie.datasource.write.precombine.field=tpep_pickup_datetime + * hoodie.metadata.enable=false + * hoodie.table.name=hudi_tbl + */ + +public class SparkDataSourceContinuousIngestTool { + + private static final Logger LOG = LogManager.getLogger(SparkDataSourceContinuousIngestTool.class); + + private final Config cfg; + // Properties with source, hoodie client, key generator etc. + private TypedProperties props; + private HoodieSparkEngineContext context; + private SparkSession sparkSession; + + public SparkDataSourceContinuousIngestTool(JavaSparkContext jsc, Config cfg) { + if (cfg.propsFilePath != null) { + cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString(); + } + this.context = new HoodieSparkEngineContext(jsc); + this.sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate(); + this.cfg = cfg; + this.props = cfg.propsFilePath == null + ? UtilHelpers.buildProperties(cfg.configs) + : readConfigFromFileSystem(jsc, cfg); + } + + public static void main(String[] args) { + final Config cfg = new Config(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + final JavaSparkContext jsc = UtilHelpers.buildSparkContext("spark-datasource-continuous-ingestion-tool", cfg.sparkMaster, cfg.sparkMemory); + try { + new SparkDataSourceContinuousIngestTool(jsc, cfg).run(); + } catch (Throwable throwable) { + LOG.error("Fail to run Continuous Ingestion for spark datasource " + cfg.basePath, throwable); + } finally { + jsc.stop(); + } + } + + public void run() { + try { + SparkDataSourceContinuousIngest sparkDataSourceContinuousIngest = + new SparkDataSourceContinuousIngest(sparkSession, context.getHadoopConf().get(), new Path(cfg.sourcePath), cfg.sparkFormat, + new Path(cfg.checkpointFilePath), new Path(cfg.basePath), getPropsAsMap(props), + cfg.minSyncIntervalSeconds); + sparkDataSourceContinuousIngest.startIngestion(); + } finally { + sparkSession.stop(); + context.getJavaSparkContext().stop(); + } + } + + private Map getPropsAsMap(TypedProperties typedProperties) { + Map props = new HashMap<>(); + typedProperties.entrySet().forEach(entry -> props.put(entry.getKey().toString(), entry.getValue().toString())); + return props; + } + + /** + * Reads config from the file system. + * + * @param jsc {@link JavaSparkContext} instance. + * @param cfg {@link HoodieRepairTool.Config} instance. + * @return the {@link TypedProperties} instance. + */ + private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) + .getProps(true); + } + + public static class Config implements Serializable { + @Parameter(names = {"--source-path", "-sp"}, description = "Source path for the parquet data to consume", required = true) + public String sourcePath = null; + @Parameter(names = {"--source-format", "-sf"}, description = "source data format", required = false) + public String sparkFormat = "parquet"; + @Parameter(names = {"--checkpoint-file-path", "-cpf"}, description = "Checkpoint file path to store/fetch checkpointing info", required = true) + public String checkpointFilePath = null; + @Parameter(names = {"--base-path", "-bp"}, description = "Base path for the hudi table", required = true) + public String basePath = null; + @Parameter(names = {"--spark-master", "-ms"}, description = "Spark master", required = false) + public String sparkMaster = null; + @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory to use", required = false) + public String sparkMemory = "1g"; + @Parameter(names = {"--min-sync-interval-seconds"}, + description = "the min sync interval of each sync in continuous mode") + public Integer minSyncIntervalSeconds = 0; + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + + @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for " + + "hoodie client for table repair") + public String propsFilePath = null; + + @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", + splitter = IdentitySplitter.class) + public List configs = new ArrayList<>(); + } +} diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala new file mode 100644 index 0000000000000..550ff9776f6d1 --- /dev/null +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/SparkDataSourceContinuousIngest.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.integ.testsuite + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} +import org.apache.log4j.LogManager +import org.apache.spark.sql.{SaveMode, SparkSession} + +import java.io.Serializable + +class SparkDataSourceContinuousIngest(val spark: SparkSession, val conf: Configuration, val sourcePath: Path, + val sourceFormat: String, val checkpointFile: Path, hudiBasePath: Path, hudiOptions: java.util.Map[String, String], + minSyncIntervalSeconds: Long) extends Serializable { + + private val log = LogManager.getLogger(getClass) + + def startIngestion(): Unit = { + val fs = sourcePath.getFileSystem(conf) + var orderedBatch : Array[FileStatus] = null + if (fs.exists(checkpointFile)) { + log.info("Checkpoint file exists. ") + val checkpoint = spark.sparkContext.textFile(checkpointFile.toString).collect()(0) + log.warn("Checkpoint to resume from " + checkpoint) + + orderedBatch = fetchListOfFilesToConsume(fs, sourcePath, new PathFilter { + override def accept(path: Path): Boolean = { + path.getName.toLong > checkpoint.toLong + } + }) + if (log.isDebugEnabled) { + log.debug("List of batches to consume in order ") + orderedBatch.foreach(entry => log.warn(" " + entry.getPath.getName)) + } + + } else { + log.warn("No checkpoint file exists. Starting from scratch ") + orderedBatch = fetchListOfFilesToConsume(fs, sourcePath, new PathFilter { + override def accept(path: Path): Boolean = { + true + } + }) + if (log.isDebugEnabled) { + log.debug("List of batches to consume in order ") + orderedBatch.foreach(entry => log.warn(" " + entry.getPath.getName)) + } + } + + orderedBatch.foreach(entry => { + log.info("Consuming from batch " + entry) + val pathToConsume = new Path(sourcePath.toString + "/" + entry.getPath.getName) + val df = spark.read.format(sourceFormat).load(pathToConsume.toString) + + df.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiBasePath.toString) + writeToFile(checkpointFile, entry.getPath.getName, fs) + log.info("Completed batch " + entry + ". Moving to next batch. Sleeping for " + minSyncIntervalSeconds + " secs before next batch") + Thread.sleep(minSyncIntervalSeconds * 1000) + }) + } + + def fetchListOfFilesToConsume(fs: FileSystem, basePath: Path, pathFilter: PathFilter): Array[FileStatus] = { + val nextBatches = fs.listStatus(basePath, pathFilter) + nextBatches.sortBy(fileStatus => fileStatus.getPath.getName.toLong) + } + + def writeToFile(checkpointFilePath: Path, str: String, fs: FileSystem): Unit = { + if (!fs.exists(checkpointFilePath)) { + fs.create(checkpointFilePath) + } + val fsOutStream = fs.create(checkpointFilePath, true) + fsOutStream.writeBytes(str) + fsOutStream.flush() + fsOutStream.close() + } +}