Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.testutils.HoodieWriteableTestTable;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -80,7 +80,7 @@ public void init() throws Exception {

// generate 200 records
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
HoodieWriteableTestTable testTable = HoodieWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);
HoodieSparkWriteableTestTable testTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);

HoodieRecord[] hoodieRecords1 = SchemaTestUtil.generateHoodieTestRecords(0, 100, schema).toArray(new HoodieRecord[100]);
HoodieRecord[] hoodieRecords2 = SchemaTestUtil.generateHoodieTestRecords(100, 100, schema).toArray(new HoodieRecord[100]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,24 @@

package org.apache.hudi.config;

import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.bootstrap.BootstrapMode;
import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.client.common.EngineType;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;

import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import javax.annotation.concurrent.Immutable;
Expand All @@ -59,6 +61,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private static final long serialVersionUID = 0L;

public static final String TABLE_NAME = "hoodie.table.name";
public static final String PRECOMBINE_FIELD_PROP = "hoodie.datasource.write.precombine.field";
public static final String WRITE_PAYLOAD_CLASS = "hoodie.datasource.write.payload.class";
public static final String DEFAULT_WRITE_PAYLOAD_CLASS = OverwriteWithLatestAvroPayload.class.getName();
public static final String KEYGENERATOR_CLASS_PROP = "hoodie.datasource.write.keygenerator.class";
public static final String DEFAULT_KEYGENERATOR_CLASS = SimpleAvroKeyGenerator.class.getName();
public static final String DEFAULT_ROLLBACK_USING_MARKERS = "false";
public static final String ROLLBACK_USING_MARKERS = "hoodie.rollback.using.markers";
public static final String TIMELINE_LAYOUT_VERSION = "hoodie.timeline.layout.version";
Expand Down Expand Up @@ -194,6 +201,18 @@ public String getTableName() {
return props.getProperty(TABLE_NAME);
}

public String getPreCombineField() {
return props.getProperty(PRECOMBINE_FIELD_PROP);
}

public String getWritePayloadClass() {
return props.getProperty(WRITE_PAYLOAD_CLASS);
}

public String getKeyGeneratorClass() {
return props.getProperty(KEYGENERATOR_CLASS_PROP);
}

public Boolean shouldAutoCommit() {
return Boolean.parseBoolean(props.getProperty(HOODIE_AUTO_COMMIT_PROP));
}
Expand Down Expand Up @@ -902,6 +921,21 @@ public Builder forTable(String tableName) {
return this;
}

public Builder withPreCombineField(String preCombineField) {
props.setProperty(PRECOMBINE_FIELD_PROP, preCombineField);
return this;
}

public Builder withWritePayLoad(String payload) {
props.setProperty(WRITE_PAYLOAD_CLASS, payload);
return this;
}

public Builder withKeyGenerator(String keyGeneratorClass) {
props.setProperty(KEYGENERATOR_CLASS_PROP, keyGeneratorClass);
return this;
}

public Builder withTimelineLayoutVersion(int version) {
props.setProperty(TIMELINE_LAYOUT_VERSION, String.valueOf(version));
return this;
Expand Down Expand Up @@ -1094,6 +1128,10 @@ protected void setDefaults() {
setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM,
DEFAULT_ROLLBACK_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(KEYGENERATOR_CLASS_PROP),
KEYGENERATOR_CLASS_PROP, DEFAULT_KEYGENERATOR_CLASS);
setDefaultOnCondition(props, !props.containsKey(WRITE_PAYLOAD_CLASS),
WRITE_PAYLOAD_CLASS, DEFAULT_WRITE_PAYLOAD_CLASS);
setDefaultOnCondition(props, !props.containsKey(ROLLBACK_USING_MARKERS), ROLLBACK_USING_MARKERS,
DEFAULT_ROLLBACK_USING_MARKERS);
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), COMBINE_BEFORE_INSERT_PROP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.client.common.TaskContextSupplier;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
Expand All @@ -37,7 +35,6 @@
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
import org.apache.hudi.io.storage.HoodieParquetWriter;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -57,43 +54,22 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.hudi.common.testutils.FileCreateUtils.baseFileName;

public class HoodieWriteableTestTable extends HoodieTestTable {
private static final Logger LOG = LogManager.getLogger(HoodieWriteableTestTable.class);

private final Schema schema;
private final BloomFilter filter;
protected final Schema schema;
protected final BloomFilter filter;

private HoodieWriteableTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) {
protected HoodieWriteableTestTable(String basePath, FileSystem fs, HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) {
super(basePath, fs, metaClient);
this.schema = schema;
this.filter = filter;
}

public static HoodieWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) {
return new HoodieWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient, schema, filter);
}

public static HoodieWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema) {
BloomFilter filter = BloomFilterFactory
.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name());
return of(metaClient, schema, filter);
}

public static HoodieWriteableTestTable of(HoodieTable hoodieTable, Schema schema) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
return of(metaClient, schema);
}

public static HoodieWriteableTestTable of(HoodieTable hoodieTable, Schema schema, BloomFilter filter) {
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
return of(metaClient, schema, filter);
}

@Override
public HoodieWriteableTestTable addCommit(String instantTime) throws Exception {
return (HoodieWriteableTestTable) super.addCommit(instantTime);
Expand All @@ -104,29 +80,7 @@ public HoodieWriteableTestTable forCommit(String instantTime) {
return (HoodieWriteableTestTable) super.forCommit(instantTime);
}

public String getFileIdWithInserts(String partition) throws Exception {
return getFileIdWithInserts(partition, new HoodieRecord[0]);
}

public String getFileIdWithInserts(String partition, HoodieRecord... records) throws Exception {
return getFileIdWithInserts(partition, Arrays.asList(records));
}

public String getFileIdWithInserts(String partition, List<HoodieRecord> records) throws Exception {
String fileId = UUID.randomUUID().toString();
withInserts(partition, fileId, records);
return fileId;
}

public HoodieWriteableTestTable withInserts(String partition, String fileId) throws Exception {
return withInserts(partition, fileId, new HoodieRecord[0]);
}

public HoodieWriteableTestTable withInserts(String partition, String fileId, HoodieRecord... records) throws Exception {
return withInserts(partition, fileId, Arrays.asList(records));
}

public HoodieWriteableTestTable withInserts(String partition, String fileId, List<HoodieRecord> records) throws Exception {
public HoodieWriteableTestTable withInserts(String partition, String fileId, List<HoodieRecord> records, TaskContextSupplier contextSupplier) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is too long, it would be better if we could break it. But it does not matter, let's refactor it next time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is too long, it would be better if we could break it. But it does not matter, let's refactor it next time.

FileCreateUtils.createPartitionMetaFile(basePath, partition);
String fileName = baseFileName(currentInstantTime, fileId);

Expand All @@ -138,7 +92,7 @@ public HoodieWriteableTestTable withInserts(String partition, String fileId, Lis
try (HoodieParquetWriter writer = new HoodieParquetWriter(
currentInstantTime,
new Path(Paths.get(basePath, partition, fileName).toString()),
config, schema, new SparkTaskContextSupplier())) {
config, schema, contextSupplier)) {
int seqId = 1;
for (HoodieRecord record : records) {
GenericRecord avroRecord = (GenericRecord) record.getData().getInsertValue(schema).get();
Expand Down
22 changes: 22 additions & 0 deletions hudi-client/hudi-flink-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,28 @@
<scope>test</scope>
</dependency>

<!-- Flink - Tests -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>

<!-- Test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.testutils;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implements Serializable {

protected static final Logger LOG = LogManager.getLogger(HoodieFlinkClientTestHarness.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to split the non-static and static fields, moreover, it is a logger.

private String testMethodName;
protected transient Configuration hadoopConf = null;
protected transient FileSystem fs;
protected transient MiniClusterWithClientResource flinkCluster = null;

@BeforeEach
public void setTestMethodName(TestInfo testInfo) {
if (testInfo.getTestMethod().isPresent()) {
testMethodName = testInfo.getTestMethod().get().getName();
} else {
testMethodName = "Unknown";
}
}

protected void initFlinkMiniCluster() {
flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
}

protected void initFileSystem() {
hadoopConf = new Configuration();
initFileSystemWithConfiguration(hadoopConf);
}

private void initFileSystemWithConfiguration(Configuration configuration) {
if (basePath == null) {
throw new IllegalStateException("The base path has not been initialized.");
}
fs = FSUtils.getFs(basePath, configuration);
if (fs instanceof LocalFileSystem) {
LocalFileSystem lfs = (LocalFileSystem) fs;
// With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
// This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
// So, for the tests, we enforce checksum verification to circumvent the problem
lfs.setVerifyChecksum(true);
}
}

/**
* Initializes an instance of {@link HoodieTableMetaClient} with a special table type specified by
* {@code getTableType()}.
*
* @throws IOException
*/
protected void initMetaClient() throws IOException {
initMetaClient(getTableType());
}

protected void initMetaClient(HoodieTableType tableType) throws IOException {
if (basePath == null) {
throw new IllegalStateException("The base path has not been initialized.");
}
metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType);
}


/**
* Cleanups file system.
*
* @throws IOException
*/
protected void cleanupFileSystem() throws IOException {
if (fs != null) {
LOG.warn("Closing file-system instance used in previous test-run");
fs.close();
fs = null;
}
}

protected void cleanupFlinkMiniCluster() {
if (flinkCluster != null) {
flinkCluster.after();
flinkCluster = null;
}
}

public static class SimpleTestSinkFunction implements SinkFunction<HoodieRecord> {

// must be static
public static List<HoodieRecord> valuesList = new ArrayList<>();

@Override
public synchronized void invoke(HoodieRecord value, Context context) throws Exception {
valuesList.add(value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import org.apache.hudi.table.action.commit.SparkWriteHelper;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.HoodieWriteableTestTable;
import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -114,7 +114,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {

@BeforeEach
public void setUpTestTable() {
testTable = HoodieWriteableTestTable.of(metaClient);
testTable = HoodieSparkWriteableTestTable.of(metaClient);
}

/**
Expand Down
Loading