Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
Expand Down Expand Up @@ -72,6 +73,8 @@ public void init() throws Exception {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.NumericUtils;
Expand Down Expand Up @@ -209,6 +210,8 @@ public void testShowArchivedCommits() throws Exception {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath1)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.CompactionTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
Expand Down Expand Up @@ -159,6 +160,8 @@ private void generateArchive() throws IOException {
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestCommitMetadataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(2, 3).build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.forTable("test-trip-table").build();
// archive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.testutils.HoodieClientTestUtils;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -62,6 +64,8 @@ public void init() throws Exception {
new TableCommand().createTable(
tablePath, tableName, HoodieTableType.COPY_ON_WRITE.name(),
"", TimelineLayoutVersion.VERSION_1, "org.apache.hudi.common.model.HoodieAvroPayload");
timelineService = HoodieClientTestUtils.initTimelineService(
context, basePath(), FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue());
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
//Create some commits files and base files
HoodieTestTable.of(metaClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import org.apache.hudi.client.HoodieReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.hudi.testutils.providers.SparkProvider;
import org.apache.hudi.timeline.service.TimelineService;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
Expand All @@ -39,10 +42,13 @@

public class CLIFunctionalTestHarness implements SparkProvider {

protected static int timelineServicePort =
FileSystemViewStorageConfig.REMOTE_PORT_NUM.defaultValue();
protected static transient TimelineService timelineService;
protected static transient HoodieSparkEngineContext context;
private static transient SparkSession spark;
private static transient SQLContext sqlContext;
private static transient JavaSparkContext jsc;
private static transient HoodieSparkEngineContext context;
private static transient JLineShellComponent shell;
/**
* An indicator of the initialization status.
Expand Down Expand Up @@ -107,6 +113,9 @@ public synchronized void runBeforeEach() {
jsc = new JavaSparkContext(spark.sparkContext());
context = new HoodieSparkEngineContext(jsc);
shell = new Bootstrap().getJLineShellComponent();
timelineService = HoodieClientTestUtils.initTimelineService(
context, basePath(), incrementTimelineServicePortToUse());
timelineServicePort = timelineService.getServerPort();
}
}

Expand All @@ -120,14 +129,25 @@ public static synchronized void cleanUpAfterAll() {
shell.stop();
shell = null;
}
if (timelineService != null) {
timelineService.close();
}
}

/**
* Helper to prepare string for matching.
*
* @param str Input string.
* @return pruned string with non word characters removed.
*/
protected static String removeNonWordAndStripSpace(String str) {
return str.replaceAll("[\\s]+", ",").replaceAll("[\\W]+", ",");
}

protected int incrementTimelineServicePortToUse() {
// Increment the timeline service port for each individual test
// to avoid port reuse causing failures
timelineServicePort = (timelineServicePort + 1 - 1024) % (65536 - 1024) + 1024;
return timelineServicePort;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
Expand Down Expand Up @@ -248,14 +249,17 @@ public class HoodieWriteConfig extends HoodieConfig {

public static final ConfigProperty<String> MARKERS_TYPE = ConfigProperty
.key("hoodie.write.markers.type")
.defaultValue(MarkerType.DIRECT.toString())
.defaultValue(MarkerType.TIMELINE_SERVER_BASED.toString())
.sinceVersion("0.9.0")
.withDocumentation("Marker type to use. Two modes are supported: "
+ "- DIRECT: individual marker file corresponding to each data file is directly "
+ "created by the writer. "
+ "- TIMELINE_SERVER_BASED: marker operations are all handled at the timeline service "
+ "which serves as a proxy. New marker entries are batch processed and stored "
+ "in a limited number of underlying files for efficiency.");
+ "in a limited number of underlying files for efficiency. If HDFS is used or "
+ "timeline server is disabled, DIRECT markers are used as fallback even if this "
+ "is configure. For Spark structured streaming, this configuration does not "
+ "take effect, i.e., DIRECT markers are always used for Spark structured streaming.");

public static final ConfigProperty<Integer> MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS = ConfigProperty
.key("hoodie.markers.timeline_server_based.batch.num_threads")
Expand Down Expand Up @@ -2159,6 +2163,7 @@ public Builder withProperties(Properties properties) {
}

protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
// Check for mandatory properties
writeConfig.setDefaults(HoodieWriteConfig.class.getName());
// Make sure the props is propagated
Expand Down Expand Up @@ -2213,5 +2218,18 @@ public HoodieWriteConfig build() {
// Build WriteConfig at the end
return new HoodieWriteConfig(engineType, writeConfig.getProps());
}

private String getDefaultMarkersType(EngineType engineType) {
switch (engineType) {
case SPARK:
return MarkerType.TIMELINE_SERVER_BASED.toString();
case FLINK:
case JAVA:
// Timeline-server-based marker is not supported for Flink and Java engines
return MarkerType.DIRECT.toString();
default:
throw new HoodieNotSupportedException("Unsupported engine " + engineType);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.hudi.table.marker;

import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieRemoteException;
import org.apache.hudi.table.HoodieTable;

Expand Down Expand Up @@ -132,18 +134,24 @@ protected Option<Path> create(String partitionPath, String dataFileName, IOType

Map<String, String> paramsMap = new HashMap<>();
paramsMap.put(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
paramsMap.put(MARKER_NAME_PARAM, partitionPath + "/" + markerFileName);
if (StringUtils.isNullOrEmpty(partitionPath)) {
paramsMap.put(MARKER_NAME_PARAM, markerFileName);
} else {
paramsMap.put(MARKER_NAME_PARAM, partitionPath + "/" + markerFileName);
}

boolean success;
try {
success = executeRequestToTimelineServer(
CREATE_MARKER_URL, paramsMap, new TypeReference<Boolean>() {}, RequestMethod.POST);
CREATE_MARKER_URL, paramsMap, new TypeReference<Boolean>() {
}, RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to create marker file " + partitionPath + "/" + markerFileName, e);
}
LOG.info("[timeline-server-based] Created marker file " + partitionPath + "/" + markerFileName
+ " in " + timer.endTimer() + " ms");
if (success) {
return Option.of(new Path(new Path(markerDirPath, partitionPath), markerFileName));
return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath, partitionPath), markerFileName));
} else {
return Option.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.hudi.table.marker;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;

import com.esotericsoftware.minlog.Log;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand All @@ -43,6 +46,18 @@ public static WriteMarkers get(MarkerType markerType, HoodieTable table, String
case DIRECT:
return new DirectWriteMarkers(table, instantTime);
case TIMELINE_SERVER_BASED:
if (!table.getConfig().isEmbeddedTimelineServerEnabled()) {
Log.warn("Timeline-server-based markers are configured as the marker type "
+ "but embedded timeline server is not enabled. Falling back to direct markers.");
return new DirectWriteMarkers(table, instantTime);
}
String basePath = table.getMetaClient().getBasePath();
if (StorageSchemes.HDFS.getScheme().equals(
FSUtils.getFs(basePath, table.getContext().getHadoopConf().newCopy()).getScheme())) {
Log.warn("Timeline-server-based markers are not supported for HDFS: "
+ "base path " + basePath + ". Falling back to direct markers.");
return new DirectWriteMarkers(table, instantTime);
}
return new TimelineServerBasedWriteMarkers(table, instantTime);
default:
throw new HoodieException("The marker type \"" + markerType.name() + "\" is not supported.");
Expand Down
Loading