diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java index 8104ef7744fce..196b91849e23f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java @@ -85,7 +85,7 @@ public static List getRecordsUsingInputFormat(Configuration conf, .collect(Collectors.toList())); return inputPaths.stream().map(path -> { - setInputPath(jobConf, path); + FileInputFormat.setInputPaths(jobConf, path); List records = new ArrayList<>(); try { List splits = Arrays.asList(inputFormat.getSplits(jobConf, 1)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java index 04a1712a2197a..00964033a678e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkDatasetTestUtils.java @@ -142,8 +142,9 @@ public static Row getRandomValue(String partitionPath, boolean isError) { public static List toInternalRows(Dataset rows, ExpressionEncoder encoder) { List toReturn = new ArrayList<>(); List rowList = rows.collectAsList(); + ExpressionEncoder.Serializer serializer = encoder.createSerializer(); for (Row row : rowList) { - toReturn.add(encoder.toRow(row).copy()); + toReturn.add(serializer.apply(row).copy()); } return toReturn; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index ec62361d8c916..8381c799089a1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -166,7 +166,7 @@ private T executeRequest(String requestPath, Map queryParame break; } String content = response.returnContent().asString(); - return mapper.readValue(content, reference); + return (T) mapper.readValue(content, reference); } private Map getParamsWithPartitionPath(String partitionPath) { diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml index e9fcc61e013b5..d5a97f7f37f4f 100644 --- a/hudi-integ-test/pom.xml +++ b/hudi-integ-test/pom.xml @@ -206,12 +206,12 @@ com.fasterxml.jackson.dataformat jackson-dataformat-yaml - 2.7.4 + ${fasterxml.version} com.fasterxml.jackson.core jackson-databind - 2.6.7.3 + ${fasterxml.version} @@ -220,11 +220,6 @@ jackson-annotations test - - com.fasterxml.jackson.core - jackson-databind - test - com.fasterxml.jackson.datatype jackson-datatype-guava diff --git a/hudi-spark/src/main/java/org/apache/hudi/internal/DefaultSource.java b/hudi-spark/src/main/java/org/apache/hudi/internal/DefaultSource.java deleted file mode 100644 index 5fb71df777df4..0000000000000 --- a/hudi-spark/src/main/java/org/apache/hudi/internal/DefaultSource.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.internal; - -import org.apache.hudi.DataSourceUtils; -import org.apache.hudi.config.HoodieWriteConfig; - -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.sources.DataSourceRegister; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.ReadSupport; -import org.apache.spark.sql.sources.v2.WriteSupport; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; -import org.apache.spark.sql.types.StructType; - -import java.util.Optional; - -/** - * DataSource V2 implementation for managing internal write logic. Only called internally. - */ -public class DefaultSource implements DataSourceV2, ReadSupport, WriteSupport, - DataSourceRegister { - - private static final Logger LOG = LogManager - .getLogger(DefaultSource.class); - - private SparkSession sparkSession = null; - private Configuration configuration = null; - - @Override - public String shortName() { - return "hudi_internal"; - } - - @Override - public DataSourceReader createReader(StructType schema, DataSourceOptions options) { - return null; - } - - @Override - public DataSourceReader createReader(DataSourceOptions options) { - return null; - } - - @Override - public Optional createWriter(String writeUUID, StructType schema, SaveMode mode, - DataSourceOptions options) { - String instantTime = options.get(HoodieDataSourceInternalWriter.INSTANT_TIME_OPT_KEY).get(); - String path = options.get("path").get(); - String tblName = options.get(HoodieWriteConfig.TABLE_NAME).get(); - HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(null, path, tblName, options.asMap()); - return Optional.of(new HoodieDataSourceInternalWriter(instantTime, config, schema, getSparkSession(), - getConfiguration())); - } - - private SparkSession getSparkSession() { - if (sparkSession == null) { - sparkSession = SparkSession.builder().getOrCreate(); - } - return sparkSession; - } - - private Configuration getConfiguration() { - if (configuration == null) { - this.configuration = getSparkSession().sparkContext().hadoopConfiguration(); - } - return configuration; - } -} diff --git a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java b/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java deleted file mode 100644 index 7aa0fc6a3846f..0000000000000 --- a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriter.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.internal; - -import org.apache.hudi.client.HoodieInternalWriteStatus; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.io.HoodieRowCreateHandle; -import org.apache.hudi.table.HoodieTable; - -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.writer.DataWriter; -import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; -import org.apache.spark.sql.types.StructType; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; - -/** - * Hoodie's Implementation of {@link DataWriter}. This is used in data source implementation for bulk insert. - */ -public class HoodieBulkInsertDataInternalWriter implements DataWriter { - - private static final long serialVersionUID = 1L; - private static final Logger LOG = LogManager.getLogger(HoodieBulkInsertDataInternalWriter.class); - - private final String instantTime; - private final int taskPartitionId; - private final long taskId; - private final long taskEpochId; - private final HoodieTable hoodieTable; - private final HoodieWriteConfig writeConfig; - private final StructType structType; - private final List writeStatusList = new ArrayList<>(); - - private HoodieRowCreateHandle handle; - private String lastKnownPartitionPath = null; - private String fileIdPrefix = null; - private int numFilesWritten = 0; - - public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, - String instantTime, int taskPartitionId, long taskId, long taskEpochId, - StructType structType) { - this.hoodieTable = hoodieTable; - this.writeConfig = writeConfig; - this.instantTime = instantTime; - this.taskPartitionId = taskPartitionId; - this.taskId = taskId; - this.taskEpochId = taskEpochId; - this.structType = structType; - this.fileIdPrefix = UUID.randomUUID().toString(); - } - - @Override - public void write(InternalRow record) throws IOException { - try { - String partitionPath = record.getUTF8String( - HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString(); - - if ((lastKnownPartitionPath == null) || !lastKnownPartitionPath.equals(partitionPath) || !handle.canWrite()) { - LOG.info("Creating new file for partition path " + partitionPath); - createNewHandle(partitionPath); - lastKnownPartitionPath = partitionPath; - } - handle.write(record); - } catch (Throwable t) { - LOG.error("Global error thrown while trying to write records in HoodieRowCreateHandle ", t); - throw t; - } - } - - @Override - public WriterCommitMessage commit() throws IOException { - close(); - return new HoodieWriterCommitMessage(writeStatusList); - } - - @Override - public void abort() throws IOException { - } - - private void createNewHandle(String partitionPath) throws IOException { - if (null != handle) { - close(); - } - handle = new HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, getNextFileId(), - instantTime, taskPartitionId, taskId, taskEpochId, structType); - } - - public void close() throws IOException { - if (null != handle) { - writeStatusList.add(handle.close()); - } - } - - protected String getNextFileId() { - return String.format("%s-%d", fileIdPrefix, numFilesWritten++); - } -} diff --git a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriterFactory.java b/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriterFactory.java deleted file mode 100644 index 1dd0aa382cdd8..0000000000000 --- a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieBulkInsertDataInternalWriterFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.internal; - -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieTable; - -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.writer.DataWriter; -import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; -import org.apache.spark.sql.types.StructType; - -/** - * Factory to assist in instantiating {@link HoodieBulkInsertDataInternalWriter}. - */ -public class HoodieBulkInsertDataInternalWriterFactory implements DataWriterFactory { - - private final String instantTime; - private final HoodieTable hoodieTable; - private final HoodieWriteConfig writeConfig; - private final StructType structType; - - public HoodieBulkInsertDataInternalWriterFactory(HoodieTable hoodieTable, HoodieWriteConfig writeConfig, - String instantTime, StructType structType) { - this.hoodieTable = hoodieTable; - this.writeConfig = writeConfig; - this.instantTime = instantTime; - this.structType = structType; - } - - @Override - public DataWriter createDataWriter(int partitionId, long taskId, long epochId) { - return new HoodieBulkInsertDataInternalWriter(hoodieTable, writeConfig, instantTime, partitionId, taskId, epochId, - structType); - } -} diff --git a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java b/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java deleted file mode 100644 index e8cbff80a2c2e..0000000000000 --- a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.internal; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hudi.DataSourceUtils; -import org.apache.hudi.client.SparkRDDWriteClient; -import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieInstant.State; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTable; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.writer.DataSourceWriter; -import org.apache.spark.sql.sources.v2.writer.DataWriterFactory; -import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; -import org.apache.spark.sql.types.StructType; - -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; - -/** - * Implementation of {@link DataSourceWriter} for datasource "hudi.internal" to be used in datasource implementation - * of bulk insert. - */ -public class HoodieDataSourceInternalWriter implements DataSourceWriter { - - private static final long serialVersionUID = 1L; - private static final Logger LOG = LogManager.getLogger(HoodieDataSourceInternalWriter.class); - public static final String INSTANT_TIME_OPT_KEY = "hoodie.instant.time"; - - private final String instantTime; - private final HoodieTableMetaClient metaClient; - private final HoodieWriteConfig writeConfig; - private final StructType structType; - private final SparkRDDWriteClient writeClient; - private final HoodieTable hoodieTable; - private final WriteOperationType operationType; - - public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig writeConfig, StructType structType, - SparkSession sparkSession, Configuration configuration) { - this.instantTime = instantTime; - this.writeConfig = writeConfig; - this.structType = structType; - this.operationType = WriteOperationType.BULK_INSERT; - this.writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), writeConfig, true); - writeClient.setOperationType(operationType); - writeClient.startCommitWithTime(instantTime); - this.metaClient = new HoodieTableMetaClient(configuration, writeConfig.getBasePath()); - this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient); - } - - @Override - public DataWriterFactory createWriterFactory() { - metaClient.getActiveTimeline().transitionRequestedToInflight( - new HoodieInstant(State.REQUESTED, HoodieTimeline.COMMIT_ACTION, instantTime), Option.empty()); - if (WriteOperationType.BULK_INSERT == operationType) { - return new HoodieBulkInsertDataInternalWriterFactory(hoodieTable, writeConfig, instantTime, structType); - } else { - throw new IllegalArgumentException("Write Operation Type + " + operationType + " not supported "); - } - } - - @Override - public boolean useCommitCoordinator() { - return true; - } - - @Override - public void onDataWriterCommit(WriterCommitMessage message) { - LOG.info("Received commit of a data writer =" + message); - } - - @Override - public void commit(WriterCommitMessage[] messages) { - List writeStatList = Arrays.stream(messages).map(m -> (HoodieWriterCommitMessage) m) - .flatMap(m -> m.getWriteStatuses().stream().map(m2 -> m2.getStat())).collect(Collectors.toList()); - - try { - writeClient.commitStats(instantTime, writeStatList, Option.empty(), - DataSourceUtils.getCommitActionType(operationType, metaClient.getTableType())); - } catch (Exception ioe) { - throw new HoodieException(ioe.getMessage(), ioe); - } finally { - writeClient.close(); - } - } - - @Override - public void abort(WriterCommitMessage[] messages) { - LOG.error("Commit " + instantTime + " aborted "); - writeClient.rollback(instantTime); - writeClient.close(); - } -} diff --git a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java b/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java deleted file mode 100644 index 757000c57c1dd..0000000000000 --- a/hudi-spark/src/main/java/org/apache/hudi/internal/HoodieWriterCommitMessage.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.internal; - -import java.util.ArrayList; -import java.util.List; -import org.apache.hudi.client.HoodieInternalWriteStatus; -import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage; - -/** - * Hoodie's {@link WriterCommitMessage} used in datasource implementation. - */ -public class HoodieWriterCommitMessage implements WriterCommitMessage { - - private List writeStatuses = new ArrayList<>(); - - public HoodieWriterCommitMessage(List writeStatuses) { - this.writeStatuses = writeStatuses; - } - - public List getWriteStatuses() { - return writeStatuses; - } - - @Override - public String toString() { - return "HoodieWriterCommitMessage{" + "writeStatuses=" + writeStatuses + '}'; - } -} diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 70a135624ffd3..c215d91eb5e68 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -23,7 +23,7 @@ import org.apache.avro.Schema import org.apache.hudi.avro.HoodieAvroUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.SchemaConverters -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} @@ -41,7 +41,8 @@ object AvroConversionUtils { // Use the Avro schema to derive the StructType which has the correct nullability information val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] val encoder = RowEncoder.apply(dataType).resolveAndBind() - df.queryExecution.toRdd.map(encoder.fromRow) + val deserializer: ExpressionEncoder.Deserializer[Row] = encoder.createDeserializer() + df.queryExecution.toRdd.map(internalR => deserializer.apply(internalR)) .mapPartitions { records => if (records.isEmpty) Iterator.empty else { diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f078c8bf67cce..8cd52551d32af 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -38,7 +38,6 @@ import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, B import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} -import org.apache.hudi.internal.HoodieDataSourceInternalWriter import org.apache.hudi.sync.common.AbstractSyncTool import org.apache.log4j.LogManager import org.apache.spark.SparkContext @@ -118,15 +117,6 @@ private[hudi] object HoodieSparkSqlWriter { val commitActionType = DataSourceUtils.getCommitActionType(operation, tableConfig.getTableType) - // short-circuit if bulk_insert via row is enabled. - // scalastyle:off - if (parameters(ENABLE_ROW_WRITER_OPT_KEY).toBoolean) { - val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName, - basePath, path, instantTime) - return (success, commitTime, common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) - } - // scalastyle:on - val (writeResult, writeClient: SparkRDDWriteClient[HoodieRecordPayload[Nothing]]) = if (operation != WriteOperationType.DELETE) { // register classes & schemas @@ -269,31 +259,6 @@ private[hudi] object HoodieSparkSqlWriter { metaSyncSuccess } - def bulkInsertAsRow(sqlContext: SQLContext, - parameters: Map[String, String], - df: DataFrame, - tblName: String, - basePath: Path, - path: Option[String], - instantTime: String): (Boolean, common.util.Option[String]) = { - val structName = s"${tblName}_record" - val nameSpace = s"hoodie.${tblName}" - val writeConfig = DataSourceUtils.createHoodieConfig(null, path.get, tblName, mapAsJavaMap(parameters)) - val hoodieDF = HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, writeConfig, df, structName, nameSpace) - hoodieDF.write.format("org.apache.hudi.internal") - .option(HoodieDataSourceInternalWriter.INSTANT_TIME_OPT_KEY, instantTime) - .options(parameters) - .save() - val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) - val metaSyncEnabled = parameters.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) - val syncHiveSucess = if (hiveSyncEnabled || metaSyncEnabled) { - metaSync(parameters, basePath, sqlContext.sparkContext.hadoopConfiguration) - } else { - true - } - (syncHiveSucess, common.util.Option.ofNullable(instantTime)) - } - def toProperties(params: Map[String, String]): TypedProperties = { val props = new TypedProperties() params.foreach(kv => props.setProperty(kv._1, kv._2)) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index 26babd834b23d..f73e2d2707e82 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -20,7 +20,6 @@ package org.apache.hudi import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.common.model.HoodieRecord -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex} import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -35,10 +34,35 @@ object HoodieSparkUtils { })) } + def isGlobPath(pattern: Path): Boolean = { + pattern.toString.exists("{}[]*?\\".toSet.contains) + } + + def globPath(fs: FileSystem, pattern: Path): Seq[Path] = { + Option(fs.globStatus(pattern)).map { statuses => + statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq + }.getOrElse(Seq.empty[Path]) + } + + def globPathIfNecessary(fs: FileSystem, pattern: Path): Seq[Path] = { + if (isGlobPath(pattern)) globPath(fs, pattern) else Seq(pattern) + } + + /** Following Spark org.apache.spark.deploy.SparkHadoopUtil implementation of handling glob patterns + * + * Checks to see whether input path contains a glob pattern and if yes, maps it to a list of absolute paths + * which match the glob pattern. Otherwise, returns original path + * + * @param paths List of absolute or globbed paths + * @param fs File system + * @return list of absolute file paths + * + */ + def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = { paths.flatMap(path => { val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified) + val globPaths = globPathIfNecessary(fs, qualified) globPaths }) } diff --git a/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index c1a6acdb04253..0b81fa7b804cf 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -113,9 +113,6 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() ) - // Follow the implementation of Spark internal HadoopRDD to handle the broadcast configuration. - FileSystem.getLocal(jobConf) - SparkHadoopUtil.get.addCredentials(jobConf) val rdd = new HoodieMergeOnReadRDD( sqlContext.sparkContext, jobConf, diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index 606490f444e26..52389f6b15cf0 100644 --- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -43,13 +43,13 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.DataStreamWriter; import org.apache.spark.sql.streaming.OutputMode; -import org.apache.spark.sql.streaming.ProcessingTime; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.spark.sql.streaming.StreamingQuery; +import org.apache.spark.sql.streaming.Trigger; import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings; @@ -366,7 +366,7 @@ public void stream(Dataset streamingInput, String operationType, String che .outputMode(OutputMode.Append()); updateHiveSyncConfig(writer); - StreamingQuery query = writer.trigger(new ProcessingTime(500)).start(tablePath); + StreamingQuery query = writer.trigger(Trigger.ProcessingTime(500)).start(tablePath); query.awaitTermination(streamingDurationInMs); } diff --git a/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java b/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java deleted file mode 100644 index 5a5d8b2700d4e..0000000000000 --- a/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.internal; - -import org.apache.hudi.client.HoodieInternalWriteStatus; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieSparkTable; -import org.apache.hudi.table.HoodieTable; -import org.apache.hudi.testutils.HoodieClientTestHarness; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.InternalRow; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - -import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.getInternalRowWithError; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; - -/** - * Unit tests {@link HoodieBulkInsertDataInternalWriter}. - */ -public class TestHoodieBulkInsertDataInternalWriter extends HoodieClientTestHarness { - - private static final Random RANDOM = new Random(); - - @BeforeEach - public void setUp() throws Exception { - initSparkContexts("TestHoodieBulkInsertDataInternalWriter"); - initPath(); - initFileSystem(); - initTestDataGenerator(); - initMetaClient(); - } - - @AfterEach - public void tearDown() throws Exception { - cleanupResources(); - } - - @Test - public void testDataInternalWriter() throws IOException { - // init config and table - HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - // execute N rounds - for (int i = 0; i < 5; i++) { - String instantTime = "00" + i; - // init writer - HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE); - - int size = 10 + RANDOM.nextInt(1000); - // write N rows to partition1, N rows to partition2 and N rows to partition3 ... Each batch should create a new RowCreateHandle and a new file - int batches = 5; - Dataset totalInputRows = null; - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - List fileAbsPaths = new ArrayList<>(); - List fileNames = new ArrayList<>(); - - // verify write statuses - assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, fileAbsPaths, fileNames); - - // verify rows - Dataset result = sqlContext.read().parquet(fileAbsPaths.toArray(new String[0])); - assertOutput(totalInputRows, result, instantTime, fileNames); - } - } - - - /** - * Issue some corrupted or wrong schematized InternalRow after few valid InternalRows so that global error is thrown. write batch 1 of valid records write batch2 of invalid records which is expected - * to throw Global Error. Verify global error is set appropriately and only first batch of records are written to disk. - */ - @Test - public void testGlobalFailure() throws IOException { - // init config and table - HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); - HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0]; - - String instantTime = "001"; - HoodieBulkInsertDataInternalWriter writer = new HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), STRUCT_TYPE); - - int size = 10 + RANDOM.nextInt(100); - int totalFailures = 5; - // Generate first batch of valid rows - Dataset inputRows = getRandomRows(sqlContext, size / 2, partitionPath, false); - List internalRows = toInternalRows(inputRows, ENCODER); - - // generate some failures rows - for (int i = 0; i < totalFailures; i++) { - internalRows.add(getInternalRowWithError(partitionPath)); - } - - // generate 2nd batch of valid rows - Dataset inputRows2 = getRandomRows(sqlContext, size / 2, partitionPath, false); - internalRows.addAll(toInternalRows(inputRows2, ENCODER)); - - // issue writes - try { - for (InternalRow internalRow : internalRows) { - writer.write(internalRow); - } - fail("Should have failed"); - } catch (Throwable e) { - // expected - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - - List fileAbsPaths = new ArrayList<>(); - List fileNames = new ArrayList<>(); - // verify write statuses - assertWriteStatuses(commitMetadata.getWriteStatuses(), 1, size / 2, fileAbsPaths, fileNames); - - // verify rows - Dataset result = sqlContext.read().parquet(fileAbsPaths.toArray(new String[0])); - assertOutput(inputRows, result, instantTime, fileNames); - } - - private void writeRows(Dataset inputRows, HoodieBulkInsertDataInternalWriter writer) throws IOException { - List internalRows = toInternalRows(inputRows, ENCODER); - // issue writes - for (InternalRow internalRow : internalRows) { - writer.write(internalRow); - } - } - - private void assertWriteStatuses(List writeStatuses, int batches, int size, List fileAbsPaths, List fileNames) { - assertEquals(batches, writeStatuses.size()); - int counter = 0; - for (HoodieInternalWriteStatus writeStatus : writeStatuses) { - // verify write status - assertEquals(writeStatus.getTotalRecords(), size); - assertNull(writeStatus.getGlobalError()); - assertEquals(writeStatus.getFailedRowsSize(), 0); - assertNotNull(writeStatus.getFileId()); - String fileId = writeStatus.getFileId(); - assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3], writeStatus.getPartitionPath()); - fileAbsPaths.add(basePath + "/" + writeStatus.getStat().getPath()); - fileNames.add(writeStatus.getStat().getPath().substring(writeStatus.getStat().getPath().lastIndexOf('/') + 1)); - HoodieWriteStat writeStat = writeStatus.getStat(); - assertEquals(size, writeStat.getNumInserts()); - assertEquals(size, writeStat.getNumWrites()); - assertEquals(fileId, writeStat.getFileId()); - assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3], writeStat.getPartitionPath()); - assertEquals(0, writeStat.getNumDeletes()); - assertEquals(0, writeStat.getNumUpdateWrites()); - assertEquals(0, writeStat.getTotalWriteErrors()); - } - } - - private void assertOutput(Dataset expectedRows, Dataset actualRows, String instantTime, List fileNames) { - // verify 3 meta fields that are filled in within create handle - actualRows.collectAsList().forEach(entry -> { - assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime); - assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD))); - assertTrue(fileNames.contains(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD)))); - assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))); - }); - - // after trimming 2 of the meta fields, rest of the fields should match - Dataset trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); - Dataset trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); - assertEquals(0, trimmedActual.except(trimmedExpected).count()); - } -} diff --git a/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java b/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java deleted file mode 100644 index 89d748f671aac..0000000000000 --- a/hudi-spark/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java +++ /dev/null @@ -1,321 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.internal; - -import org.apache.hudi.client.HoodieInternalWriteStatus; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.testutils.HoodieTestDataGenerator; -import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.testutils.HoodieClientTestHarness; -import org.apache.hudi.testutils.HoodieClientTestUtils; - -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.sources.v2.writer.DataWriter; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Random; - -import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows; -import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; - -/** - * Unit tests {@link HoodieDataSourceInternalWriter}. - */ -public class TestHoodieDataSourceInternalWriter extends HoodieClientTestHarness { - - private static final Random RANDOM = new Random(); - - @BeforeEach - public void setUp() throws Exception { - initSparkContexts("TestHoodieDataSourceInternalWriter"); - initPath(); - initFileSystem(); - initTestDataGenerator(); - initMetaClient(); - } - - @AfterEach - public void tearDown() throws Exception { - cleanupResources(); - } - - @Test - public void testDataSourceWriter() throws IOException { - // init config and table - HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); - String instantTime = "001"; - // init writer - HoodieDataSourceInternalWriter dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); - DataWriter writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong()); - - List partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS); - List partitionPathsAbs = new ArrayList<>(); - for (String partitionPath : partitionPaths) { - partitionPathsAbs.add(basePath + "/" + partitionPath + "/*"); - } - - int size = 10 + RANDOM.nextInt(1000); - int batches = 5; - Dataset totalInputRows = null; - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - List commitMessages = new ArrayList<>(); - commitMessages.add(commitMetadata); - dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - metaClient.reloadActiveTimeline(); - Dataset result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0])); - // verify output - assertOutput(totalInputRows, result, instantTime); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size); - } - - @Test - public void testMultipleDataSourceWrites() throws IOException { - // init config and table - HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); - int partitionCounter = 0; - - // execute N rounds - for (int i = 0; i < 5; i++) { - String instantTime = "00" + i; - // init writer - HoodieDataSourceInternalWriter dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); - - List commitMessages = new ArrayList<>(); - Dataset totalInputRows = null; - DataWriter writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++, RANDOM.nextLong(), RANDOM.nextLong()); - - int size = 10 + RANDOM.nextInt(1000); - int batches = 5; // one batch per partition - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - commitMessages.add(commitMetadata); - dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - metaClient.reloadActiveTimeline(); - - Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime); - - // verify output - assertOutput(totalInputRows, result, instantTime); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size); - } - } - - @Test - public void testLargeWrites() throws IOException { - // init config and table - HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); - int partitionCounter = 0; - - // execute N rounds - for (int i = 0; i < 3; i++) { - String instantTime = "00" + i; - // init writer - HoodieDataSourceInternalWriter dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); - - List commitMessages = new ArrayList<>(); - Dataset totalInputRows = null; - DataWriter writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++, RANDOM.nextLong(), RANDOM.nextLong()); - - int size = 10000 + RANDOM.nextInt(10000); - int batches = 3; // one batch per partition - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - commitMessages.add(commitMetadata); - dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - metaClient.reloadActiveTimeline(); - - Dataset result = HoodieClientTestUtils.readCommit(basePath, sqlContext, metaClient.getCommitTimeline(), instantTime); - - // verify output - assertOutput(totalInputRows, result, instantTime); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size); - } - } - - /** - * Tests that DataSourceWriter.abort() will abort the written records of interest write and commit batch1 write and abort batch2 Read of entire dataset should show only records from batch1. - * commit batch1 - * abort batch2 - * verify only records from batch1 is available to read - */ - @Test - public void testAbort() throws IOException { - // init config and table - HoodieWriteConfig cfg = getConfigBuilder(basePath).build(); - - String instantTime0 = "00" + 0; - // init writer - HoodieDataSourceInternalWriter dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); - DataWriter writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong()); - - List partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS); - List partitionPathsAbs = new ArrayList<>(); - for (String partitionPath : partitionPaths) { - partitionPathsAbs.add(basePath + "/" + partitionPath + "/*"); - } - - int size = 10 + RANDOM.nextInt(100); - int batches = 1; - Dataset totalInputRows = null; - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - if (totalInputRows == null) { - totalInputRows = inputRows; - } else { - totalInputRows = totalInputRows.union(inputRows); - } - } - - HoodieWriterCommitMessage commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - List commitMessages = new ArrayList<>(); - commitMessages.add(commitMetadata); - // commit 1st batch - dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - metaClient.reloadActiveTimeline(); - Dataset result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0])); - // verify rows - assertOutput(totalInputRows, result, instantTime0); - assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size); - - // 2nd batch. abort in the end - String instantTime1 = "00" + 1; - dataSourceInternalWriter = - new HoodieDataSourceInternalWriter(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf); - writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(1, RANDOM.nextLong(), RANDOM.nextLong()); - - for (int j = 0; j < batches; j++) { - String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3]; - Dataset inputRows = getRandomRows(sqlContext, size, partitionPath, false); - writeRows(inputRows, writer); - } - - commitMetadata = (HoodieWriterCommitMessage) writer.commit(); - commitMessages = new ArrayList<>(); - commitMessages.add(commitMetadata); - // commit 1st batch - dataSourceInternalWriter.abort(commitMessages.toArray(new HoodieWriterCommitMessage[0])); - metaClient.reloadActiveTimeline(); - result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0])); - // verify rows - // only rows from first batch should be present - assertOutput(totalInputRows, result, instantTime0); - } - - private void writeRows(Dataset inputRows, DataWriter writer) throws IOException { - List internalRows = toInternalRows(inputRows, ENCODER); - // issue writes - for (InternalRow internalRow : internalRows) { - writer.write(internalRow); - } - } - - private void assertWriteStatuses(List writeStatuses, int batches, int size) { - assertEquals(batches, writeStatuses.size()); - int counter = 0; - for (HoodieInternalWriteStatus writeStatus : writeStatuses) { - assertEquals(writeStatus.getPartitionPath(), HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter % 3]); - assertEquals(writeStatus.getTotalRecords(), size); - assertEquals(writeStatus.getFailedRowsSize(), 0); - assertEquals(writeStatus.getTotalErrorRecords(), 0); - assertFalse(writeStatus.hasErrors()); - assertNull(writeStatus.getGlobalError()); - assertNotNull(writeStatus.getFileId()); - String fileId = writeStatus.getFileId(); - HoodieWriteStat writeStat = writeStatus.getStat(); - assertEquals(size, writeStat.getNumInserts()); - assertEquals(size, writeStat.getNumWrites()); - assertEquals(fileId, writeStat.getFileId()); - assertEquals(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[counter++ % 3], writeStat.getPartitionPath()); - assertEquals(0, writeStat.getNumDeletes()); - assertEquals(0, writeStat.getNumUpdateWrites()); - assertEquals(0, writeStat.getTotalWriteErrors()); - } - } - - private void assertOutput(Dataset expectedRows, Dataset actualRows, String instantTime) { - // verify 3 meta fields that are filled in within create handle - actualRows.collectAsList().forEach(entry -> { - assertEquals(entry.get(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).toString(), instantTime); - assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.FILENAME_METADATA_FIELD))); - assertFalse(entry.isNullAt(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))); - }); - - // after trimming 2 of the meta fields, rest of the fields should match - Dataset trimmedExpected = expectedRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); - Dataset trimmedActual = actualRows.drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD); - assertEquals(0, trimmedActual.except(trimmedExpected).count()); - } -} diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala index 226cf5313f5d0..7a902c14b5f23 100644 --- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala +++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala @@ -26,7 +26,7 @@ import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.log4j.LogManager import org.apache.spark.sql._ -import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime} +import org.apache.spark.sql.streaming.{OutputMode, Trigger} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} @@ -93,7 +93,7 @@ class TestStructuredStreaming extends HoodieClientTestBase { .writeStream .format("org.apache.hudi") .options(commonOpts) - .trigger(new ProcessingTime(100)) + .trigger(Trigger.ProcessingTime(100)) .option("checkpointLocation", basePath + "/checkpoint") .outputMode(OutputMode.Append) .start(destPath) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java index e76790918754a..5af5b7174a446 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java @@ -54,7 +54,6 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -67,6 +66,7 @@ import org.apache.spark.sql.jdbc.JdbcDialect; import org.apache.spark.sql.jdbc.JdbcDialects; import org.apache.spark.sql.types.StructType; +import org.apache.spark.util.LongAccumulator; import java.io.BufferedReader; import java.io.IOException; @@ -277,7 +277,7 @@ public static SparkRDDWriteClient createHoodieClient(JavaSparkContext jsc, Strin } public static int handleErrors(JavaSparkContext jsc, String instantTime, JavaRDD writeResponse) { - Accumulator errors = jsc.accumulator(0); + LongAccumulator errors = jsc.sc().longAccumulator(); writeResponse.foreach(writeStatus -> { if (writeStatus.hasErrors()) { errors.add(1); diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index 39e48bbb50ba4..d8232d77d12e9 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -105,6 +105,7 @@ io.prometheus:simpleclient_common com.yammer.metrics:metrics-core org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version} + org.apache.spark:spark-token-provider-kafka-0-10_${scala.binary.version} org.apache.kafka:kafka_${scala.binary.version} com.101tec:zkclient org.apache.kafka:kafka-clients diff --git a/pom.xml b/pom.xml index 2c786cfad81bc..8494de9c8aa9f 100644 --- a/pom.xml +++ b/pom.xml @@ -81,8 +81,8 @@ 3.0.0-M1 1.8 - 2.6.7 - 2.0.0 + 2.10.0 + 2.4.1 2.17 1.10.1 5.7.0-M1 @@ -99,10 +99,10 @@ 4.1.1 0.8.0 4.4.1 - 2.4.4 + 3.0.0 1.8.2 - 2.11.12 - 2.11 + 2.12.10 + 2.12 0.12 3.3.1 3.0.1 @@ -429,7 +429,7 @@ com.fasterxml.jackson.core jackson-databind - ${fasterxml.version}.3 + ${fasterxml.version} com.fasterxml.jackson.datatype @@ -439,7 +439,7 @@ com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} - ${fasterxml.version}.1 + ${fasterxml.version} diff --git a/scripts/run_travis_tests.sh b/scripts/run_travis_tests.sh index 63fb959c966af..01ede179f6677 100755 --- a/scripts/run_travis_tests.sh +++ b/scripts/run_travis_tests.sh @@ -18,16 +18,16 @@ mode=$1 modules=$2 -sparkVersion=2.4.4 +sparkVersion=3.0.0 hadoopVersion=2.7 if [ "$mode" = "unit" ]; then - mvn clean install -DskipTests -q + mvn clean install -Pscala-2.12 -DskipTests -q echo "Running Unit Tests" - mvn test -Punit-tests -pl "$modules" -B + mvn test -Pscala-2.12 -Punit-tests -pl "$modules" -B elif [ "$mode" = "functional" ]; then echo "Running Functional Tests" - mvn test -Pfunctional-tests -B + mvn test -Pscala-2.12 -Pfunctional-tests -B elif [ "$mode" = "integration" ]; then echo "Downloading Apache Spark-${sparkVersion}-bin-hadoop${hadoopVersion}" wget http://archive.apache.org/dist/spark/spark-${sparkVersion}/spark-${sparkVersion}-bin-hadoop${hadoopVersion}.tgz -O /tmp/spark-${sparkVersion}.tgz @@ -35,7 +35,7 @@ elif [ "$mode" = "integration" ]; then export SPARK_HOME=$PWD/spark-${sparkVersion}-bin-hadoop${hadoopVersion} mkdir /tmp/spark-events/ echo "Running Integration Tests" - mvn verify -Pintegration-tests -B + mvn verify -Pscala-2.12 -Pintegration-tests -B else echo "Unknown mode $mode" exit 1