Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;

import org.apache.avro.Schema;
Expand Down Expand Up @@ -211,4 +212,22 @@ public static Stream<GenericRecord> readHFile(JavaSparkContext jsc, String[] pat
return valuesAsList.stream();
}

public static Option<HoodieCommitMetadata> getCommitMetadataForLatestInstant(HoodieTableMetaClient metaClient) {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
if (timeline.lastInstant().isPresent()) {
return getCommitMetadataForInstant(metaClient, timeline.lastInstant().get());
} else {
return Option.empty();
}
}

private static Option<HoodieCommitMetadata> getCommitMetadataForInstant(HoodieTableMetaClient metaClient, HoodieInstant instant) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
byte[] data = timeline.getInstantDetails(instant).get();
return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
} catch (Exception e) {
throw new HoodieException("Failed to read schema from commit metadata", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -132,6 +133,18 @@ public static HoodieRecordPayload createPayload(String payloadClass, GenericReco
}
}

public static Map<String, String> getExtraMetadata(Map<String, String> properties) {
Map<String, String> extraMetadataMap = new HashMap<>();
if (properties.containsKey(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key())) {
properties.entrySet().forEach(entry -> {
if (entry.getKey().startsWith(properties.get(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key()))) {
extraMetadataMap.put(entry.getKey(), entry.getValue());
}
});
}
return extraMetadataMap;
}

/**
* Create a payload class via reflection, do not ordering/precombine value.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Helper class for HoodieDataSourceInternalWriter used by Spark datasource v2.
Expand All @@ -54,14 +54,17 @@ public class DataSourceInternalWriterHelper {
private final SparkRDDWriteClient writeClient;
private final HoodieTable hoodieTable;
private final WriteOperationType operationType;
private Map<String, String> extraMetadata;

public DataSourceInternalWriterHelper(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
SparkSession sparkSession, Configuration configuration) {
SparkSession sparkSession, Configuration configuration, Map<String, String> extraMetadata) {
this.instantTime = instantTime;
this.operationType = WriteOperationType.BULK_INSERT;
this.extraMetadata = extraMetadata;
this.writeClient = new SparkRDDWriteClient<>(new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), writeConfig);
writeClient.setOperationType(operationType);
writeClient.startCommitWithTime(instantTime);

this.metaClient = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(writeConfig.getBasePath()).build();
this.hoodieTable = HoodieSparkTable.create(writeConfig, new HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), metaClient);
}
Expand All @@ -76,7 +79,7 @@ public void onDataWriterCommit(String message) {

public void commit(List<HoodieWriteStat> writeStatList) {
try {
writeClient.commitStats(instantTime, writeStatList, Option.of(new HashMap<>()),
writeClient.commitStats(instantTime, writeStatList, Option.of(extraMetadata),
CommitUtils.getCommitActionType(operationType, metaClient.getTableType()));
} catch (Exception ioe) {
throw new HoodieException(ioe.getMessage(), ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ public Optional<DataSourceWriter> createWriter(String writeUUID, StructType sche
options.get(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED).isPresent()
? options.get(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED).get() : null);
return Optional.of(new HoodieDataSourceInternalWriter(instantTime, config, schema, getSparkSession(),
getConfiguration(), arePartitionRecordsSorted));
getConfiguration(), options, arePartitionRecordsSorted));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.internal;

import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
Expand All @@ -26,13 +27,16 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
Expand All @@ -46,15 +50,18 @@ public class HoodieDataSourceInternalWriter implements DataSourceWriter {
private final StructType structType;
private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
private final Boolean arePartitionRecordsSorted;
private Map<String, String> extraMetadataMap = new HashMap<>();

public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
SparkSession sparkSession, Configuration configuration, boolean arePartitionRecordsSorted) {
SparkSession sparkSession, Configuration configuration, DataSourceOptions dataSourceOptions,
boolean arePartitionRecordsSorted) {
this.instantTime = instantTime;
this.writeConfig = writeConfig;
this.structType = structType;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.extraMetadataMap = DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
sparkSession, configuration);
sparkSession, configuration, extraMetadataMap);
}

@Override
Expand Down Expand Up @@ -89,4 +96,5 @@ public void commit(WriterCommitMessage[] messages) {
public void abort(WriterCommitMessage[] messages) {
dataSourceInternalWriterHelper.abort();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.internal;

import org.apache.hudi.client.HoodieInternalWriteStatus;

import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.internal;

import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -26,18 +28,24 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.hudi.testutils.SparkDatasetTestUtils.ENCODER;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.STRUCT_TYPE;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getConfigBuilder;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.getRandomRows;
import static org.apache.hudi.testutils.SparkDatasetTestUtils.toInternalRows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Unit tests {@link HoodieDataSourceInternalWriter}.
Expand All @@ -47,12 +55,16 @@ public class TestHoodieDataSourceInternalWriter extends

@Test
public void testDataSourceWriter() throws Exception {
testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP);
}

private void testDataSourceWriterInternal(Map<String, String> extraMetadata, Map<String, String> expectedExtraMetadata) throws Exception {
// init config and table
HoodieWriteConfig cfg = getConfigBuilder(basePath).build();
String instantTime = "001";
// init writer
HoodieDataSourceInternalWriter dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false);
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(extraMetadata), false);
DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong());

String[] partitionPaths = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS;
Expand All @@ -64,7 +76,6 @@ public void testDataSourceWriter() throws Exception {
int size = 10 + RANDOM.nextInt(1000);
int batches = 5;
Dataset<Row> totalInputRows = null;

for (int j = 0; j < batches; j++) {
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
Dataset<Row> inputRows = getRandomRows(sqlContext, size, partitionPath, false);
Expand All @@ -80,11 +91,50 @@ public void testDataSourceWriter() throws Exception {
List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
commitMessages.add(commitMetadata);
dataSourceInternalWriter.commit(commitMessages.toArray(new HoodieWriterCommitMessage[0]));

metaClient.reloadActiveTimeline();
Dataset<Row> result = HoodieClientTestUtils.read(jsc, basePath, sqlContext, metaClient.getFs(), partitionPathsAbs.toArray(new String[0]));
// verify output
assertOutput(totalInputRows, result, instantTime, Option.empty());
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, size, Option.empty(), Option.empty());

// verify extra metadata
Option<HoodieCommitMetadata> commitMetadataOption = HoodieClientTestUtils.getCommitMetadataForLatestInstant(metaClient);
assertTrue(commitMetadataOption.isPresent());
Map<String, String> actualExtraMetadata = new HashMap<>();
commitMetadataOption.get().getExtraMetadata().entrySet().stream().filter(entry ->
!entry.getKey().equals(HoodieCommitMetadata.SCHEMA_KEY)).forEach(entry -> actualExtraMetadata.put(entry.getKey(), entry.getValue()));
assertEquals(actualExtraMetadata, expectedExtraMetadata);
}

@Test
public void testDataSourceWriterExtraCommitMetadata() throws Exception {

String commitExtraMetaPrefix = "commit_extra_meta_";
Map<String, String> extraMeta = new HashMap<>();
extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key(), commitExtraMetaPrefix);
extraMeta.put(commitExtraMetaPrefix + "a", "valA");
extraMeta.put(commitExtraMetaPrefix + "b", "valB");
extraMeta.put("commit_extra_c", "valC"); // should not be part of commit extra metadata

Map<String, String> expectedMetadata = new HashMap<>();
expectedMetadata.putAll(extraMeta);
expectedMetadata.remove(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key());
expectedMetadata.remove("commit_extra_c");

testDataSourceWriterInternal(extraMeta, expectedMetadata);
}

@Test
public void testDataSourceWriterEmptyExtraCommitMetadata() throws Exception {
String commitExtraMetaPrefix = "commit_extra_meta_";
Map<String, String> extraMeta = new HashMap<>();
extraMeta.put(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY().key(), commitExtraMetaPrefix);
extraMeta.put("keyA", "valA");
extraMeta.put("keyB", "valB");
extraMeta.put("commit_extra_c", "valC");
// none of the keys has commit metadata key prefix.
testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP);
}

@Test
Expand All @@ -98,8 +148,7 @@ public void testMultipleDataSourceWrites() throws Exception {
String instantTime = "00" + i;
// init writer
HoodieDataSourceInternalWriter dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false);

new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), false);
List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
Dataset<Row> totalInputRows = null;
DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++, RANDOM.nextLong(), RANDOM.nextLong());
Expand Down Expand Up @@ -142,8 +191,7 @@ public void testLargeWrites() throws Exception {
String instantTime = "00" + i;
// init writer
HoodieDataSourceInternalWriter dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false);

new HoodieDataSourceInternalWriter(instantTime, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), false);
List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
Dataset<Row> totalInputRows = null;
DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++, RANDOM.nextLong(), RANDOM.nextLong());
Expand Down Expand Up @@ -189,7 +237,7 @@ public void testAbort() throws Exception {
String instantTime0 = "00" + 0;
// init writer
HoodieDataSourceInternalWriter dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false);
new HoodieDataSourceInternalWriter(instantTime0, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), false);
DataWriter<InternalRow> writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(0, RANDOM.nextLong(), RANDOM.nextLong());

List<String> partitionPaths = Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
Expand Down Expand Up @@ -227,7 +275,7 @@ public void testAbort() throws Exception {
// 2nd batch. abort in the end
String instantTime1 = "00" + 1;
dataSourceInternalWriter =
new HoodieDataSourceInternalWriter(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, false);
new HoodieDataSourceInternalWriter(instantTime1, cfg, STRUCT_TYPE, sqlContext.sparkSession(), hadoopConf, new DataSourceOptions(Collections.EMPTY_MAP), false);
writer = dataSourceInternalWriter.createWriterFactory().createDataWriter(1, RANDOM.nextLong(), RANDOM.nextLong());

for (int j = 0; j < batches; j++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S
// 1st arg to createHooodieConfig is not really reuqired to be set. but passing it anyways.
HoodieWriteConfig config = DataSourceUtils.createHoodieConfig(properties.get(HoodieWriteConfig.AVRO_SCHEMA.key()), path, tblName, properties);
return new HoodieDataSourceInternalTable(instantTime, config, schema, getSparkSession(),
getConfiguration(), arePartitionRecordsSorted);
getConfiguration(), properties, arePartitionRecordsSorted);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.spark3.internal;

import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.client.HoodieInternalWriteStatus;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
Expand All @@ -33,7 +34,9 @@
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
Expand All @@ -47,15 +50,17 @@ public class HoodieDataSourceInternalBatchWrite implements BatchWrite {
private final StructType structType;
private final boolean arePartitionRecordsSorted;
private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
private Map<String, String> extraMetadata = new HashMap<>();

public HoodieDataSourceInternalBatchWrite(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
SparkSession jss, Configuration hadoopConfiguration, boolean arePartitionRecordsSorted) {
SparkSession jss, Configuration hadoopConfiguration, Map<String, String> properties, boolean arePartitionRecordsSorted) {
this.instantTime = instantTime;
this.writeConfig = writeConfig;
this.structType = structType;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.extraMetadata = DataSourceUtils.getExtraMetadata(properties);
this.dataSourceInternalWriterHelper = new DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
jss, hadoopConfiguration);
jss, hadoopConfiguration, extraMetadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.types.StructType;

import java.util.Map;

/**
* Implementation of {@link WriteBuilder} for datasource "hudi.spark3.internal" to be used in datasource implementation
* of bulk insert.
Expand All @@ -38,20 +40,22 @@ public class HoodieDataSourceInternalBatchWriteBuilder implements WriteBuilder {
private final SparkSession jss;
private final Configuration hadoopConfiguration;
private final boolean arePartitionRecordsSorted;
private final Map<String, String> properties;

public HoodieDataSourceInternalBatchWriteBuilder(String instantTime, HoodieWriteConfig writeConfig, StructType structType,
SparkSession jss, Configuration hadoopConfiguration, boolean arePartitionRecordsSorted) {
SparkSession jss, Configuration hadoopConfiguration, Map<String, String> properties, boolean arePartitionRecordsSorted) {
this.instantTime = instantTime;
this.writeConfig = writeConfig;
this.structType = structType;
this.jss = jss;
this.hadoopConfiguration = hadoopConfiguration;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
this.properties = properties;
}

@Override
public BatchWrite buildForBatch() {
return new HoodieDataSourceInternalBatchWrite(instantTime, writeConfig, structType, jss,
hadoopConfiguration, arePartitionRecordsSorted);
hadoopConfiguration, properties, arePartitionRecordsSorted);
}
}
Loading