diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java index fa74aa34eda66..e5a719eb3d8b5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java @@ -50,7 +50,9 @@ public static Option createEmbeddedTimelineService( LOG.info("Starting Timeline service !!"); Option hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST); timelineServer = Option.of(new EmbeddedTimelineService(context, hostAddr.orElse(null), config.getEmbeddedTimelineServerPort(), - config.getMetadataConfig(), config.getClientSpecifiedViewStorageConfig(), config.getBasePath())); + config.getMetadataConfig(), config.getClientSpecifiedViewStorageConfig(), config.getBasePath(), + config.getEmbeddedTimelineServerThreads(), config.getEmbeddedTimelineServerCompressOutput(), + config.getEmbeddedTimelineServerUseAsync())); timelineServer.get().startServer(); updateWriteConfigWithTimelineServer(timelineServer.get(), config); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index 386f7d543dea8..a2bc7116d32ba 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -48,11 +48,15 @@ public class EmbeddedTimelineService { private final HoodieMetadataConfig metadataConfig; private final String basePath; + private final int numThreads; + private final boolean shouldCompressOutput; + private final boolean useAsync; private transient FileSystemViewManager viewManager; private transient TimelineService server; public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort, - HoodieMetadataConfig metadataConfig, FileSystemViewStorageConfig config, String basePath) { + HoodieMetadataConfig metadataConfig, FileSystemViewStorageConfig config, String basePath, + int numThreads, boolean compressOutput, boolean useAsync) { setHostAddr(embeddedTimelineServiceHostAddr); this.context = context; this.config = config; @@ -61,6 +65,9 @@ public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimel this.hadoopConf = context.getHadoopConf(); this.viewManager = createViewManager(); this.preferredPort = embeddedTimelineServerPort; + this.numThreads = numThreads; + this.shouldCompressOutput = compressOutput; + this.useAsync = useAsync; } private FileSystemViewManager createViewManager() { @@ -77,7 +84,7 @@ private FileSystemViewManager createViewManager() { } public void startServer() throws IOException { - server = new TimelineService(preferredPort, viewManager, hadoopConf.newCopy()); + server = new TimelineService(preferredPort, viewManager, hadoopConf.newCopy(), numThreads, shouldCompressOutput, useAsync); serverPort = server.startService(); LOG.info("Started embedded timeline server at " + hostAddr + ":" + serverPort); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 3fc5a2df38985..5a7a89200c3b0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -111,6 +111,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true"; public static final String EMBEDDED_TIMELINE_SERVER_PORT = "hoodie.embed.timeline.server.port"; public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT = "0"; + public static final String EMBEDDED_TIMELINE_SERVER_THREADS = "hoodie.embed.timeline.server.threads"; + public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_THREADS = "-1"; + public static final String EMBEDDED_TIMELINE_SERVER_COMPRESS_OUTPUT = "hoodie.embed.timeline.server.gzip"; + public static final String DEFAULT_EMBEDDED_TIMELINE_COMPRESS_OUTPUT = "true"; + public static final String EMBEDDED_TIMELINE_SERVER_USE_ASYNC = "hoodie.embed.timeline.server.async"; + public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ASYNC = "false"; public static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = "hoodie.fail.on.timeline.archiving"; public static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = "true"; @@ -309,6 +315,18 @@ public int getEmbeddedTimelineServerPort() { return Integer.parseInt(props.getProperty(EMBEDDED_TIMELINE_SERVER_PORT, DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT)); } + public int getEmbeddedTimelineServerThreads() { + return Integer.parseInt(props.getProperty(EMBEDDED_TIMELINE_SERVER_THREADS, DEFAULT_EMBEDDED_TIMELINE_SERVER_THREADS)); + } + + public boolean getEmbeddedTimelineServerCompressOutput() { + return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_COMPRESS_OUTPUT, DEFAULT_EMBEDDED_TIMELINE_COMPRESS_OUTPUT)); + } + + public boolean getEmbeddedTimelineServerUseAsync() { + return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_USE_ASYNC, DEFAULT_EMBEDDED_TIMELINE_SERVER_ASYNC)); + } + public boolean isFailOnTimelineArchivingEnabled() { return Boolean.parseBoolean(props.getProperty(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP)); } @@ -476,7 +494,7 @@ public String getClusteringExecutionStrategyClass() { public long getClusteringMaxBytesInGroup() { return Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_MAX_BYTES_PER_GROUP)); } - + public long getClusteringSmallFileLimit() { return Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_PLAN_SMALL_FILE_LIMIT)); } @@ -492,7 +510,7 @@ public long getClusteringTargetFileMaxBytes() { public int getTargetPartitionsForClustering() { return Integer.parseInt(props.getProperty(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS)); } - + public String getClusteringSortColumns() { return props.getProperty(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY); } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 22d3082c6d961..09ebeb5cce3b1 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -18,6 +18,7 @@ package org.apache.hudi.timeline.service; +import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.dto.BaseFileDTO; import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO; @@ -29,7 +30,9 @@ import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; +import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.timeline.service.handlers.BaseFileHandler; import org.apache.hudi.timeline.service.handlers.FileSliceHandler; import org.apache.hudi.timeline.service.handlers.TimelineHandler; @@ -47,6 +50,9 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; /** @@ -62,13 +68,24 @@ public class RequestHandler { private final TimelineHandler instantHandler; private final FileSliceHandler sliceHandler; private final BaseFileHandler dataFileHandler; + private Registry metricsRegistry = Registry.getRegistry("TimelineService"); + private ScheduledExecutorService asyncResultService = Executors.newSingleThreadScheduledExecutor(); + private final boolean useAsync; - public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager viewManager) throws IOException { + public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager viewManager, boolean useAsync) throws IOException { this.viewManager = viewManager; this.app = app; this.instantHandler = new TimelineHandler(conf, viewManager); this.sliceHandler = new FileSliceHandler(conf, viewManager); this.dataFileHandler = new BaseFileHandler(conf, viewManager); + this.useAsync = useAsync; + if (useAsync) { + asyncResultService = Executors.newSingleThreadScheduledExecutor(); + } + } + + public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager viewManager) throws IOException { + this(app, conf, viewManager, false); } public void register() { @@ -130,13 +147,44 @@ private boolean syncIfLocalViewBehind(Context ctx) { } private void writeValueAsString(Context ctx, Object obj) throws JsonProcessingException { + if (useAsync) { + writeValueAsStringAsync(ctx, obj); + } else { + writeValueAsStringSync(ctx, obj); + } + } + + private void writeValueAsStringSync(Context ctx, Object obj) throws JsonProcessingException { + HoodieTimer timer = new HoodieTimer().startTimer(); boolean prettyPrint = ctx.queryParam("pretty") != null; - long beginJsonTs = System.currentTimeMillis(); String result = prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : OBJECT_MAPPER.writeValueAsString(obj); - long endJsonTs = System.currentTimeMillis(); - LOG.debug("Jsonify TimeTaken=" + (endJsonTs - beginJsonTs)); + final long jsonifyTime = timer.endTimer(); ctx.result(result); + metricsRegistry.add("WRITE_VALUE_CNT", 1); + metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime); + if (LOG.isDebugEnabled()) { + LOG.debug("Jsonify TimeTaken=" + jsonifyTime); + } + } + + private void writeValueAsStringAsync(Context ctx, Object obj) throws JsonProcessingException { + ctx.result(CompletableFuture.supplyAsync(() -> { + HoodieTimer timer = new HoodieTimer().startTimer(); + boolean prettyPrint = ctx.queryParam("pretty") != null; + try { + String result = prettyPrint ? OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : OBJECT_MAPPER.writeValueAsString(obj); + final long jsonifyTime = timer.endTimer(); + metricsRegistry.add("WRITE_VALUE_CNT", 1); + metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime); + if (LOG.isDebugEnabled()) { + LOG.debug("Jsonify TimeTaken=" + jsonifyTime); + } + return result; + } catch (JsonProcessingException e) { + throw new HoodieException("Failed to JSON encode the value", e); + } + }, asyncResultService)); } /** @@ -144,12 +192,14 @@ private void writeValueAsString(Context ctx, Object obj) throws JsonProcessingEx */ private void registerTimelineAPI() { app.get(RemoteHoodieTableFileSystemView.LAST_INSTANT, new ViewHandler(ctx -> { + metricsRegistry.add("LAST_INSTANT", 1); List dtos = instantHandler .getLastInstant(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getValue()); writeValueAsString(ctx, dtos); }, false)); app.get(RemoteHoodieTableFileSystemView.TIMELINE, new ViewHandler(ctx -> { + metricsRegistry.add("TIMELINE", 1); TimelineDTO dto = instantHandler .getTimeline(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getValue()); writeValueAsString(ctx, dto); @@ -161,6 +211,7 @@ private void registerTimelineAPI() { */ private void registerDataFilesAPI() { app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILES_URL, new ViewHandler(ctx -> { + metricsRegistry.add("LATEST_PARTITION_DATA_FILES", 1); List dtos = dataFileHandler.getLatestDataFiles( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); @@ -168,6 +219,7 @@ private void registerDataFilesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILE_URL, new ViewHandler(ctx -> { + metricsRegistry.add("LATEST_PARTITION_DATA_FILE", 1); List dtos = dataFileHandler.getLatestDataFile( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""), @@ -176,12 +228,14 @@ private void registerDataFilesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_ALL_DATA_FILES, new ViewHandler(ctx -> { + metricsRegistry.add("LATEST_ALL_DATA_FILES", 1); List dtos = dataFileHandler .getLatestDataFiles(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow()); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> { + metricsRegistry.add("LATEST_DATA_FILES_BEFORE_ON_INSTANT", 1); List dtos = dataFileHandler.getLatestDataFilesBeforeOrOn( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""), @@ -190,6 +244,7 @@ private void registerDataFilesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILE_ON_INSTANT_URL, new ViewHandler(ctx -> { + metricsRegistry.add("LATEST_DATA_FILE_ON_INSTANT", 1); List dtos = dataFileHandler.getLatestDataFileOn( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""), @@ -199,6 +254,7 @@ private void registerDataFilesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_DATA_FILES, new ViewHandler(ctx -> { + metricsRegistry.add("ALL_DATA_FILES", 1); List dtos = dataFileHandler.getAllDataFiles( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); @@ -206,6 +262,7 @@ private void registerDataFilesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_RANGE_INSTANT_URL, new ViewHandler(ctx -> { + metricsRegistry.add("LATEST_DATA_FILES_RANGE_INSTANT", 1); List dtos = dataFileHandler.getLatestDataFilesInRange( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), Arrays .asList(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INSTANTS_PARAM).getOrThrow().split(","))); @@ -218,6 +275,7 @@ private void registerDataFilesAPI() { */ private void registerFileSlicesAPI() { app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_URL, new ViewHandler(ctx -> { + metricsRegistry.add("LATEST_PARTITION_SLICES", 1); List dtos = sliceHandler.getLatestFileSlices( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); @@ -225,6 +283,7 @@ private void registerFileSlicesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICE_URL, new ViewHandler(ctx -> { + metricsRegistry.add("LATEST_PARTITION_SLICE", 1); List dtos = sliceHandler.getLatestFileSlice( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""), @@ -233,6 +292,7 @@ private void registerFileSlicesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_UNCOMPACTED_SLICES_URL, new ViewHandler(ctx -> { + metricsRegistry.add("LATEST_PARTITION_UNCOMPACTED_SLICES", 1); List dtos = sliceHandler.getLatestUnCompactedFileSlices( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); @@ -240,6 +300,7 @@ private void registerFileSlicesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_SLICES_URL, new ViewHandler(ctx -> { + metricsRegistry.add("ALL_SLICES", 1); List dtos = sliceHandler.getAllFileSlices( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); @@ -247,6 +308,7 @@ private void registerFileSlicesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_RANGE_INSTANT_URL, new ViewHandler(ctx -> { + metricsRegistry.add("LATEST_SLICE_RANGE_INSTANT", 1); List dtos = sliceHandler.getLatestFileSliceInRange( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), Arrays .asList(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INSTANTS_PARAM).getOrThrow().split(","))); @@ -254,6 +316,7 @@ private void registerFileSlicesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> { + metricsRegistry.add("LATEST_SLICES_MERGED_BEFORE_ON_INSTANT", 1); List dtos = sliceHandler.getLatestMergedFileSlicesBeforeOrOn( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""), @@ -262,6 +325,7 @@ private void registerFileSlicesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> { + metricsRegistry.add("LATEST_SLICES_BEFORE_ON_INSTANT", 1); List dtos = sliceHandler.getLatestFileSlicesBeforeOrOn( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""), @@ -273,12 +337,14 @@ private void registerFileSlicesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.PENDING_COMPACTION_OPS, new ViewHandler(ctx -> { + metricsRegistry.add("PEDING_COMPACTION_OPS", 1); List dtos = sliceHandler.getPendingCompactionOperations( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow()); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_URL, new ViewHandler(ctx -> { + metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION", 1); List dtos = sliceHandler.getAllFileGroups( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); @@ -286,12 +352,14 @@ private void registerFileSlicesAPI() { }, true)); app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE, new ViewHandler(ctx -> { + metricsRegistry.add("REFRESH_TABLE", 1); boolean success = sliceHandler .refreshTable(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow()); writeValueAsString(ctx, success); }, false)); app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, new ViewHandler(ctx -> { + metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON", 1); List dtos = sliceHandler.getReplacedFileGroupsBeforeOrOn( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""), @@ -300,6 +368,7 @@ private void registerFileSlicesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE, new ViewHandler(ctx -> { + metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE", 1); List dtos = sliceHandler.getReplacedFileGroupsBefore( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""), @@ -308,6 +377,7 @@ private void registerFileSlicesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION, new ViewHandler(ctx -> { + metricsRegistry.add("ALL_REPLACED_FILEGROUPS_PARTITION", 1); List dtos = sliceHandler.getAllReplacedFileGroups( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(), ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,"")); @@ -315,6 +385,7 @@ private void registerFileSlicesAPI() { }, true)); app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new ViewHandler(ctx -> { + metricsRegistry.add("PENDING_CLUSTERING_FILEGROUPS", 1); List dtos = sliceHandler.getFileGroupsInPendingClustering( ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow()); writeValueAsString(ctx, dtos); @@ -380,6 +451,12 @@ public void handle(@NotNull Context context) throws Exception { } finally { long endTs = System.currentTimeMillis(); long timeTakenMillis = endTs - beginTs; + metricsRegistry.add("TOTAL_API_TIME", timeTakenMillis); + metricsRegistry.add("TOTAL_REFRESH_TIME", refreshCheckTimeTaken); + metricsRegistry.add("TOTAL_HANDLE_TIME", handleTimeTaken); + metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken); + metricsRegistry.add("TOTAL_API_CALLS", 1); + LOG.info(String.format( "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], " + "Success=%s, Query=%s, Host=%s, synced=%s", diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java index 47db1fda12193..3cae8896613b9 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java @@ -29,10 +29,14 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import io.javalin.Javalin; +import io.javalin.core.util.JettyServerUtil; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.util.thread.QueuedThreadPool; import java.io.IOException; import java.io.Serializable; @@ -44,32 +48,40 @@ public class TimelineService { private static final Logger LOG = LogManager.getLogger(TimelineService.class); private static final int START_SERVICE_MAX_RETRIES = 16; + private static final int DEFAULT_NUM_THREADS = -1; private int serverPort; private Configuration conf; private transient FileSystem fs; private transient Javalin app = null; private transient FileSystemViewManager fsViewsManager; + private final int numThreads; + private final boolean shouldCompressOutput; + private final boolean useAsync; public int getServerPort() { return serverPort; } - public TimelineService(int serverPort, FileSystemViewManager globalFileSystemViewManager, Configuration conf) - throws IOException { + public TimelineService(int serverPort, FileSystemViewManager globalFileSystemViewManager, Configuration conf, + int numThreads, boolean compressOutput, boolean useAsync) throws IOException { this.conf = FSUtils.prepareHadoopConf(conf); this.fs = FileSystem.get(conf); this.serverPort = serverPort; this.fsViewsManager = globalFileSystemViewManager; + this.numThreads = numThreads; + this.shouldCompressOutput = compressOutput; + this.useAsync = useAsync; } public TimelineService(int serverPort, FileSystemViewManager globalFileSystemViewManager) throws IOException { - this(serverPort, globalFileSystemViewManager, new Configuration()); + this(serverPort, globalFileSystemViewManager, new Configuration(), DEFAULT_NUM_THREADS, true, false); } public TimelineService(Config config) throws IOException { this(config.serverPort, buildFileSystemViewManager(config, - new SerializableConfiguration(FSUtils.prepareHadoopConf(new Configuration())))); + new SerializableConfiguration(FSUtils.prepareHadoopConf(new Configuration()))), new Configuration(), + config.numThreads, config.compress, config.async); } public static class Config implements Serializable { @@ -97,6 +109,15 @@ public static class Config implements Serializable { @Parameter(names = {"--rocksdb-path", "-rp"}, description = "Root directory for RocksDB") public String rocksDBPath = FileSystemViewStorageConfig.DEFAULT_ROCKSDB_BASE_PATH; + @Parameter(names = {"--threads", "-t"}, description = "Number of threads to use for serving requests") + public int numThreads = DEFAULT_NUM_THREADS; + + @Parameter(names = {"--async"}, description = "Use asyncronous request processing") + public boolean async = false; + + @Parameter(names = {"--compress"}, description = "Compress output using gzip") + public boolean compress = true; + @Parameter(names = {"--help", "-h"}) public Boolean help = false; } @@ -129,8 +150,15 @@ private int startServiceOnPort(int port) throws IOException { } public int startService() throws IOException { - app = Javalin.create(); - RequestHandler requestHandler = new RequestHandler(app, conf, fsViewsManager); + final Server server = numThreads == DEFAULT_NUM_THREADS ? JettyServerUtil.defaultServer() + : new Server(new QueuedThreadPool(numThreads)); + + app = Javalin.create().server(() -> server); + if (!shouldCompressOutput) { + app.disableDynamicGzip(); + } + + RequestHandler requestHandler = new RequestHandler(app, conf, fsViewsManager, useAsync); app.get("/", ctx -> ctx.result("Hello World")); requestHandler.register(); int realServerPort = startServiceOnPort(serverPort);