Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public static Option<EmbeddedTimelineService> createEmbeddedTimelineService(
LOG.info("Starting Timeline service !!");
Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null), config.getEmbeddedTimelineServerPort(),
config.getMetadataConfig(), config.getClientSpecifiedViewStorageConfig(), config.getBasePath()));
config.getMetadataConfig(), config.getClientSpecifiedViewStorageConfig(), config.getBasePath(),
config.getEmbeddedTimelineServerThreads(), config.getEmbeddedTimelineServerCompressOutput(),
config.getEmbeddedTimelineServerUseAsync()));
timelineServer.get().startServer();
updateWriteConfigWithTimelineServer(timelineServer.get(), config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,15 @@ public class EmbeddedTimelineService {
private final HoodieMetadataConfig metadataConfig;
private final String basePath;

private final int numThreads;
private final boolean shouldCompressOutput;
private final boolean useAsync;
private transient FileSystemViewManager viewManager;
private transient TimelineService server;

public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort,
HoodieMetadataConfig metadataConfig, FileSystemViewStorageConfig config, String basePath) {
HoodieMetadataConfig metadataConfig, FileSystemViewStorageConfig config, String basePath,
int numThreads, boolean compressOutput, boolean useAsync) {
setHostAddr(embeddedTimelineServiceHostAddr);
this.context = context;
this.config = config;
Expand All @@ -61,6 +65,9 @@ public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimel
this.hadoopConf = context.getHadoopConf();
this.viewManager = createViewManager();
this.preferredPort = embeddedTimelineServerPort;
this.numThreads = numThreads;
this.shouldCompressOutput = compressOutput;
this.useAsync = useAsync;
}

private FileSystemViewManager createViewManager() {
Expand All @@ -77,7 +84,7 @@ private FileSystemViewManager createViewManager() {
}

public void startServer() throws IOException {
server = new TimelineService(preferredPort, viewManager, hadoopConf.newCopy());
server = new TimelineService(preferredPort, viewManager, hadoopConf.newCopy(), numThreads, shouldCompressOutput, useAsync);
serverPort = server.startService();
LOG.info("Started embedded timeline server at " + hostAddr + ":" + serverPort);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
public static final String EMBEDDED_TIMELINE_SERVER_PORT = "hoodie.embed.timeline.server.port";
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT = "0";
public static final String EMBEDDED_TIMELINE_SERVER_THREADS = "hoodie.embed.timeline.server.threads";
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_THREADS = "-1";
public static final String EMBEDDED_TIMELINE_SERVER_COMPRESS_OUTPUT = "hoodie.embed.timeline.server.gzip";
public static final String DEFAULT_EMBEDDED_TIMELINE_COMPRESS_OUTPUT = "true";
public static final String EMBEDDED_TIMELINE_SERVER_USE_ASYNC = "hoodie.embed.timeline.server.async";
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ASYNC = "false";

public static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = "hoodie.fail.on.timeline.archiving";
public static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = "true";
Expand Down Expand Up @@ -309,6 +315,18 @@ public int getEmbeddedTimelineServerPort() {
return Integer.parseInt(props.getProperty(EMBEDDED_TIMELINE_SERVER_PORT, DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT));
}

public int getEmbeddedTimelineServerThreads() {
return Integer.parseInt(props.getProperty(EMBEDDED_TIMELINE_SERVER_THREADS, DEFAULT_EMBEDDED_TIMELINE_SERVER_THREADS));
}

public boolean getEmbeddedTimelineServerCompressOutput() {
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_COMPRESS_OUTPUT, DEFAULT_EMBEDDED_TIMELINE_COMPRESS_OUTPUT));
}

public boolean getEmbeddedTimelineServerUseAsync() {
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_USE_ASYNC, DEFAULT_EMBEDDED_TIMELINE_SERVER_ASYNC));
}

public boolean isFailOnTimelineArchivingEnabled() {
return Boolean.parseBoolean(props.getProperty(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP));
}
Expand Down Expand Up @@ -476,7 +494,7 @@ public String getClusteringExecutionStrategyClass() {
public long getClusteringMaxBytesInGroup() {
return Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_MAX_BYTES_PER_GROUP));
}

public long getClusteringSmallFileLimit() {
return Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_PLAN_SMALL_FILE_LIMIT));
}
Expand All @@ -492,7 +510,7 @@ public long getClusteringTargetFileMaxBytes() {
public int getTargetPartitionsForClustering() {
return Integer.parseInt(props.getProperty(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS));
}

public String getClusteringSortColumns() {
return props.getProperty(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY);
}
Expand Down
Loading