From 091943461a6aa7e7dab9364813fb867f6a8771f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=96=BB=E5=85=86=E9=9D=96?= Date: Mon, 1 Aug 2022 22:03:31 +0800 Subject: [PATCH] [HUDI-3475] Initialize hudi table management module --- .../HoodieTableServiceManagerClient.java | 2 +- .../hudi/common/config/ConfigGroups.java | 3 +- .../hudi-table-service-manager/pom.xml | 430 ++++++++++++++++++ .../manager/HoodieTableServiceManager.java | 152 +++++++ .../table/service/manager/RequestHandler.java | 168 +++++++ .../service/manager/common/CommandConfig.java | 98 ++++ .../HoodieTableServiceManagerConfig.java | 293 ++++++++++++ .../service/manager/common/ServiceConfig.java | 109 +++++ .../manager/common/ServiceContext.java | 101 ++++ .../table/service/manager/entity/Action.java | 53 +++ .../manager/entity/AssistQueryEntity.java | 46 ++ .../table/service/manager/entity/Engine.java | 44 ++ .../service/manager/entity/Instance.java | 86 ++++ .../manager/entity/InstanceStatus.java | 61 +++ .../HoodieTableServiceManagerException.java | 32 ++ .../manager/executor/BaseActionExecutor.java | 81 ++++ .../manager/executor/CompactionExecutor.java | 52 +++ .../executor/submitter/ExecutionEngine.java | 55 +++ .../executor/submitter/SparkEngine.java | 209 +++++++++ .../manager/handlers/ActionHandler.java | 57 +++ .../manager/handlers/CompactionHandler.java | 66 +++ .../service/manager/service/BaseService.java | 29 ++ .../service/manager/service/CleanService.java | 77 ++++ .../manager/service/ExecutorService.java | 103 +++++ .../manager/service/MonitorService.java | 65 +++ .../manager/service/RestoreService.java | 143 ++++++ .../service/manager/service/RetryService.java | 81 ++++ .../manager/service/ScheduleService.java | 114 +++++ .../service/manager/store/MetadataStore.java | 41 ++ .../manager/store/impl/InstanceService.java | 155 +++++++ .../store/impl/RelationDBBasedStore.java | 71 +++ .../store/jdbc/HikariDataSourceFactory.java | 38 ++ .../manager/store/jdbc/JdbcMapper.java | 60 +++ .../store/jdbc/SqlSessionFactoryUtil.java | 82 ++++ .../service/manager/util/DateTimeUtils.java | 39 ++ .../service/manager/util/InstanceUtil.java | 34 ++ .../src/main/resources/hikariPool.properties | 21 + .../src/main/resources/mybatis-config.xml | 42 ++ .../src/main/resources/mybatis/Instance.xml | 178 ++++++++ .../main/resources/table-service-manager.sql | 44 ++ .../resources/log4j-surefire-quiet.properties | 29 ++ .../test/resources/log4j-surefire.properties | 30 ++ hudi-platform-service/pom.xml | 1 + 43 files changed, 3673 insertions(+), 2 deletions(-) create mode 100644 hudi-platform-service/hudi-table-service-manager/pom.xml create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/HoodieTableServiceManager.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/RequestHandler.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/CommandConfig.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/HoodieTableServiceManagerConfig.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/ServiceConfig.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/ServiceContext.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/Action.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/AssistQueryEntity.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/Engine.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/Instance.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/InstanceStatus.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/exception/HoodieTableServiceManagerException.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/BaseActionExecutor.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/CompactionExecutor.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/submitter/ExecutionEngine.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/submitter/SparkEngine.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/handlers/ActionHandler.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/handlers/CompactionHandler.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/BaseService.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/CleanService.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/ExecutorService.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/MonitorService.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/RestoreService.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/RetryService.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/ScheduleService.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/MetadataStore.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/impl/InstanceService.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/impl/RelationDBBasedStore.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/jdbc/HikariDataSourceFactory.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/jdbc/JdbcMapper.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/jdbc/SqlSessionFactoryUtil.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/util/DateTimeUtils.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/util/InstanceUtil.java create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/resources/hikariPool.properties create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/resources/mybatis-config.xml create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/resources/mybatis/Instance.xml create mode 100644 hudi-platform-service/hudi-table-service-manager/src/main/resources/table-service-manager.sql create mode 100644 hudi-platform-service/hudi-table-service-manager/src/test/resources/log4j-surefire-quiet.properties create mode 100644 hudi-platform-service/hudi-table-service-manager/src/test/resources/log4j-surefire.properties diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTableServiceManagerClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTableServiceManagerClient.java index 780942158cfa4..5512809e2b326 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTableServiceManagerClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTableServiceManagerClient.java @@ -95,7 +95,7 @@ private String executeRequest(String requestPath, Map queryParam queryParameters.forEach(builder::addParameter); String url = builder.toString(); - LOG.info("Sending request to table management service : (" + url + ")"); + LOG.info("Sending request to table table service manager : (" + url + ")"); int timeoutMs = this.config.getConnectionTimeoutSec() * 1000; int requestRetryLimit = config.getConnectionRetryLimit(); int connectionRetryDelay = config.getConnectionRetryDelay(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java index 0488401f10abf..3ad0700589314 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigGroups.java @@ -35,7 +35,8 @@ public enum Names { METRICS("Metrics Configs"), RECORD_PAYLOAD("Record Payload Config"), KAFKA_CONNECT("Kafka Connect Configs"), - AWS("Amazon Web Services Configs"); + AWS("Amazon Web Services Configs"), + PLATFORM_SERVICE("Platform Service Configs"); public final String name; diff --git a/hudi-platform-service/hudi-table-service-manager/pom.xml b/hudi-platform-service/hudi-table-service-manager/pom.xml new file mode 100644 index 0000000000000..afea786e843d2 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/pom.xml @@ -0,0 +1,430 @@ + + + + + hudi + org.apache.hudi + 0.13.0-SNAPSHOT + + 4.0.0 + + hudi-table-service-manager + + + 8 + 8 + 3.4.6 + + + + + + org.apache.hudi + hudi-common + ${project.version} + + + org.apache.hudi + hudi-cli + ${project.version} + + + org.apache.hudi + hudi-client-common + ${project.version} + + + org.apache.hudi + hudi-utilities_${scala.binary.version} + ${project.version} + + + + + org.apache.spark + spark-core_${scala.binary.version} + compile + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.spark + spark-sql_${scala.binary.version} + compile + + + + + com.fasterxml.jackson.core + jackson-annotations + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + + + org.apache.httpcomponents + fluent-hc + + + + io.javalin + javalin + 2.8.0 + + + org.slf4j + slf4j-api + + + + + + com.beust + jcommander + + + + + org.apache.hadoop + hadoop-common + compile + + + org.mortbay.jetty + * + + + javax.servlet.jsp + * + + + javax.servlet + * + + + tools + com.sun + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.hadoop + hadoop-client + compile + + + tools + com.sun + + + javax.servlet + * + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.hadoop + hadoop-auth + compile + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.hadoop + hadoop-mapreduce-client-common + ${hadoop.version} + compile + + + slf4j-log4j12 + org.slf4j + + + log4j + log4j + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + compile + + + jersey-core + com.sun.jersey + + + jersey-server + com.sun.jersey + + + jersey-client + com.sun.jersey + + + jersey-json + com.sun.jersey + + + jersey-guice + com.sun.jersey.contribs + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.hudi + hudi-java-client + ${project.version} + + + + org.mybatis + mybatis + ${mybatis.version} + + + + org.projectlombok + lombok + 1.18.24 + + + + org.apache.avro + avro + ${avro.version} + + + org.slf4j + slf4j-api + + + + + + org.slf4j + slf4j-api + compile + + + + org.slf4j + slf4j-simple + ${slf4j.version} + compile + + + + + org.apache.logging.log4j + log4j-api + ${log4j2.version} + compile + + + org.apache.logging.log4j + log4j-core + ${log4j2.version} + compile + + + org.apache.logging.log4j + log4j-1.2-api + ${log4j2.version} + compile + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j2.version} + compile + + + org.apache.logging.log4j + log4j-jcl + ${log4j2.version} + compile + + + + com.zaxxer + HikariCP + 4.0.3 + + + + mysql + mysql-connector-java + 8.0.23 + + + + com.google.code.gson + gson + 2.8.2 + + + + + com.h2database + h2 + 1.4.200 + compile + + + org.junit.jupiter + junit-jupiter-api + test + + + + + + + org.jacoco + jacoco-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + ${maven-jar-plugin.version} + + + + test-jar + + test-compile + + + + false + + + + org.apache.rat + apache-rat-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + ${build-helper-maven-plugin.version} + + + add-source + generate-sources + + add-source + + + + src/main/java + + + + + add-test-source + generate-sources + + add-test-source + + + + src/test/java + src/main/java + + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven-shade-plugin.version} + + + package + + shade + + + ${shadeSources} + + + org.apache.hudi.table.service.manager.HoodieTableServiceManager + + + + true + + + META-INF/LICENSE + target/classes/META-INF/LICENSE + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + META-INF/services/javax.* + + + + ${project.artifactId}-${project.version} + + + + + + + + \ No newline at end of file diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/HoodieTableServiceManager.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/HoodieTableServiceManager.java new file mode 100644 index 0000000000000..0cdf5313a482f --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/HoodieTableServiceManager.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.table.service.manager.common.CommandConfig; +import org.apache.hudi.table.service.manager.common.HoodieTableServiceManagerConfig; +import org.apache.hudi.table.service.manager.service.BaseService; +import org.apache.hudi.table.service.manager.service.CleanService; +import org.apache.hudi.table.service.manager.service.ExecutorService; +import org.apache.hudi.table.service.manager.service.MonitorService; +import org.apache.hudi.table.service.manager.service.RestoreService; +import org.apache.hudi.table.service.manager.service.RetryService; +import org.apache.hudi.table.service.manager.service.ScheduleService; +import org.apache.hudi.table.service.manager.store.MetadataStore; + +import com.beust.jcommander.JCommander; +import io.javalin.Javalin; +import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Main class of hoodie table service manager. + * + * @Experimental + * @since 0.13.0 + */ +public class HoodieTableServiceManager { + + private static final Logger LOG = LogManager.getLogger(HoodieTableServiceManager.class); + + private final int serverPort; + private final Configuration conf; + private transient Javalin app = null; + private List services; + private final MetadataStore metadataStore; + private final HoodieTableServiceManagerConfig tableServiceManagerConfig; + + public HoodieTableServiceManager(CommandConfig config) { + this.conf = FSUtils.prepareHadoopConf(new Configuration()); + this.tableServiceManagerConfig = CommandConfig.toTableServiceManagerConfig(config); + this.serverPort = config.serverPort; + this.metadataStore = initMetadataStore(); + } + + public void startService() { + app = Javalin.create(); + RequestHandler requestHandler = new RequestHandler(app, conf, metadataStore); + app.get("/", ctx -> ctx.result("Hello World")); + requestHandler.register(); + app.start(serverPort); + registerService(); + initAndStartRegisterService(); + } + + private MetadataStore initMetadataStore() { + String metadataStoreClass = tableServiceManagerConfig.getMetadataStoreClass(); + MetadataStore metadataStore = (MetadataStore) ReflectionUtils.loadClass(metadataStoreClass, + new Class[] {HoodieTableServiceManagerConfig.class}, tableServiceManagerConfig); + metadataStore.init(); + LOG.info("Finish init metastore : " + metadataStoreClass); + return metadataStore; + } + + private void registerService() { + services = new ArrayList<>(); + ExecutorService executorService = new ExecutorService(metadataStore); + services.add(executorService); + services.add(new ScheduleService(executorService, metadataStore)); + services.add(new RetryService(metadataStore)); + services.add(new RestoreService(metadataStore)); + services.add(new MonitorService()); + services.add(new CleanService()); + } + + private void initAndStartRegisterService() { + for (BaseService service : services) { + service.init(); + service.startService(); + } + } + + private void stopRegisterService() { + for (BaseService service : services) { + service.stop(); + } + } + + public void run() throws IOException { + startService(); + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + // Use stderr here since the logger may have been reset by its JVM shutdown hook. + System.err.println( + "*** shutting down table service manager since JVM is shutting down"); + try { + HoodieTableServiceManager.this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(System.err); + } + System.err.println("*** Table table service manager shut down"); + })); + } + + /** + * Stop serving requests and shutdown resources. + */ + public void stop() throws InterruptedException { + if (app != null) { + LOG.info("Stop table service manager..."); + this.app.stop(); + this.app = null; + } + stopRegisterService(); + } + + public static void main(String[] args) throws Exception { + System.out.println("SPARK_HOME = " + System.getenv("SPARK_HOME")); + final CommandConfig cfg = new CommandConfig(); + JCommander cmd = new JCommander(cfg, null, args); + if (cfg.help) { + cmd.usage(); + System.exit(1); + } + HoodieTableServiceManager service = new HoodieTableServiceManager(cfg); + service.run(); + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/RequestHandler.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/RequestHandler.java new file mode 100644 index 0000000000000..e69faecb37b17 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/RequestHandler.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager; + +import org.apache.hudi.client.HoodieTableServiceManagerClient; +import org.apache.hudi.table.service.manager.entity.Action; +import org.apache.hudi.table.service.manager.entity.Engine; +import org.apache.hudi.table.service.manager.entity.Instance; +import org.apache.hudi.table.service.manager.entity.InstanceStatus; +import org.apache.hudi.table.service.manager.handlers.ActionHandler; +import org.apache.hudi.table.service.manager.store.MetadataStore; +import org.apache.hudi.table.service.manager.util.InstanceUtil; + +import io.javalin.Context; +import io.javalin.Handler; +import io.javalin.Javalin; +import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.NotNull; + +import java.util.Locale; + +/** + * Main REST Handler class that handles and delegates calls to timeline relevant handlers. + */ +public class RequestHandler { + + private static final Logger LOG = LogManager.getLogger(RequestHandler.class); + + private final Javalin app; + private final ActionHandler actionHandler; + + public RequestHandler(Javalin app, + Configuration conf, + MetadataStore metadataStore) { + this.app = app; + this.actionHandler = new ActionHandler(conf, metadataStore); + } + + public void register() { + registerCompactionAPI(); + registerClusteringAPI(); + registerCleanAPI(); + } + + /** + * Register Compaction API calls. + */ + private void registerCompactionAPI() { + app.get(HoodieTableServiceManagerClient.EXECUTE_COMPACTION, new ViewHandler(ctx -> { + for (String instant : ctx.validatedQueryParam(HoodieTableServiceManagerClient.INSTANT_PARAM).getOrThrow().split(",")) { + Instance instance = Instance.builder() + .basePath(ctx.validatedQueryParam(HoodieTableServiceManagerClient.BASEPATH_PARAM).getOrThrow()) + .dbName(ctx.validatedQueryParam(HoodieTableServiceManagerClient.DATABASE_NAME_PARAM).getOrThrow()) + .tableName(ctx.validatedQueryParam(HoodieTableServiceManagerClient.TABLE_NAME_PARAM).getOrThrow()) + .action(Action.COMPACTION.getValue()) + .instant(instant) + .executionEngine(Engine.valueOf(ctx.validatedQueryParam(HoodieTableServiceManagerClient.EXECUTION_ENGINE).getOrThrow().toUpperCase(Locale.ROOT))) + .userName(ctx.validatedQueryParam(HoodieTableServiceManagerClient.USERNAME).getOrThrow()) + .queue(ctx.validatedQueryParam(HoodieTableServiceManagerClient.QUEUE).getOrThrow()) + .resource(ctx.validatedQueryParam(HoodieTableServiceManagerClient.RESOURCE).getOrThrow()) + .parallelism(ctx.validatedQueryParam(HoodieTableServiceManagerClient.PARALLELISM).getOrThrow()) + .status(InstanceStatus.SCHEDULED.getStatus()) + .build(); + InstanceUtil.checkArgument(instance); + actionHandler.scheduleCompaction(instance); + } + })); + } + + /** + * Register Clustering API calls. + */ + private void registerClusteringAPI() { + app.get(HoodieTableServiceManagerClient.EXECUTE_CLUSTERING, new ViewHandler(ctx -> { + Instance instance = Instance.builder() + .basePath(ctx.validatedQueryParam(HoodieTableServiceManagerClient.BASEPATH_PARAM).getOrThrow()) + .dbName(ctx.validatedQueryParam(HoodieTableServiceManagerClient.DATABASE_NAME_PARAM).getOrThrow()) + .tableName(ctx.validatedQueryParam(HoodieTableServiceManagerClient.TABLE_NAME_PARAM).getOrThrow()) + .action(Action.CLUSTERING.getValue()) + .instant(ctx.validatedQueryParam(HoodieTableServiceManagerClient.INSTANT_PARAM).getOrThrow()) + .executionEngine(Engine.valueOf(ctx.validatedQueryParam(HoodieTableServiceManagerClient.EXECUTION_ENGINE).getOrThrow().toUpperCase(Locale.ROOT))) + .userName(ctx.validatedQueryParam(HoodieTableServiceManagerClient.USERNAME).getOrThrow()) + .queue(ctx.validatedQueryParam(HoodieTableServiceManagerClient.QUEUE).getOrThrow()) + .resource(ctx.validatedQueryParam(HoodieTableServiceManagerClient.RESOURCE).getOrThrow()) + .parallelism(ctx.validatedQueryParam(HoodieTableServiceManagerClient.PARALLELISM).getOrThrow()) + .status(InstanceStatus.SCHEDULED.getStatus()) + .build(); + InstanceUtil.checkArgument(instance); + actionHandler.scheduleClustering(instance); + })); + } + + /** + * Register Clean API calls. + */ + private void registerCleanAPI() { + app.get(HoodieTableServiceManagerClient.EXECUTE_CLEAN, new ViewHandler(ctx -> { + Instance instance = Instance.builder() + .basePath(ctx.validatedQueryParam(HoodieTableServiceManagerClient.BASEPATH_PARAM).getOrThrow()) + .dbName(ctx.validatedQueryParam(HoodieTableServiceManagerClient.DATABASE_NAME_PARAM).getOrThrow()) + .tableName(ctx.validatedQueryParam(HoodieTableServiceManagerClient.TABLE_NAME_PARAM).getOrThrow()) + .action(Action.CLEAN.getValue()) + .instant(ctx.validatedQueryParam(HoodieTableServiceManagerClient.INSTANT_PARAM).getOrThrow()) + .executionEngine(Engine.valueOf(ctx.validatedQueryParam(HoodieTableServiceManagerClient.EXECUTION_ENGINE).getOrThrow().toUpperCase(Locale.ROOT))) + .userName(ctx.validatedQueryParam(HoodieTableServiceManagerClient.USERNAME).getOrThrow()) + .queue(ctx.validatedQueryParam(HoodieTableServiceManagerClient.QUEUE).getOrThrow()) + .resource(ctx.validatedQueryParam(HoodieTableServiceManagerClient.RESOURCE).getOrThrow()) + .parallelism(ctx.validatedQueryParam(HoodieTableServiceManagerClient.PARALLELISM).getOrThrow()) + .status(InstanceStatus.SCHEDULED.getStatus()) + .build(); + InstanceUtil.checkArgument(instance); + actionHandler.scheduleClustering(instance); + })); + } + + /** + * Used for logging and performing refresh check. + */ + private class ViewHandler implements Handler { + + private final Handler handler; + + ViewHandler(Handler handler) { + this.handler = handler; + } + + @Override + public void handle(@NotNull Context context) throws Exception { + boolean success = true; + long beginTs = System.currentTimeMillis(); + long handleTimeTaken = 0; + try { + long handleBeginMs = System.currentTimeMillis(); + handler.handle(context); + long handleEndMs = System.currentTimeMillis(); + handleTimeTaken = handleEndMs - handleBeginMs; + } catch (RuntimeException re) { + success = false; + LOG.error("Got runtime exception servicing request " + context.queryString(), re); + throw re; + } finally { + long endTs = System.currentTimeMillis(); + long timeTakenMillis = endTs - beginTs; + LOG.info(String.format( + "TimeTakenMillis[Total=%d, handle=%d], Success=%s, Query=%s, Host=%s", + timeTakenMillis, handleTimeTaken, success, context.queryString(), context.host())); + } + } + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/CommandConfig.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/CommandConfig.java new file mode 100644 index 0000000000000..905292667ed4f --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/CommandConfig.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.common; + +import com.beust.jcommander.Parameter; + +public class CommandConfig { + @Parameter(names = {"--server-port", "-p"}, description = "Server Port") + public Integer serverPort = 9092; + + @Parameter(names = {"-schedule-interval-ms"}, description = "Schedule Interval Ms") + public Long scheduleIntervalMs = 30000L; + + @Parameter(names = {"-schedule-core-executor-size"}, description = "Schedule Core Execute Size") + public Integer scheduleCoreExecuteSize = 300; + + @Parameter(names = {"-schedule-max-executor-size"}, description = "Schedule Max Execute Size") + public Integer scheduleMaxExecuteSize = 1000; + + @Parameter(names = {"-metadata-store-class"}, description = "Metadata Store Class") + public String metadataStoreClass = "org.apache.hudi.table.service.manager.store.impl.RelationDBBasedStore"; + + @Parameter(names = {"-instance-cache-enable"}, description = "Instance Cache Enable") + public boolean instanceCacheEnable = true; + + @Parameter(names = {"-instance-max-retry-num"}, description = "Instance Max Retry Num") + public Integer instanceMaxRetryNum = 3; + + @Parameter(names = {"-instance-submit-timeout-sec"}, description = "Instance Submit Timeout Sec") + public Integer instanceSubmitTimeoutSec = 600; + + @Parameter(names = {"-spark-submit-jar-path"}, description = "Spark Submit Jar Path") + public String sparkSubmitJarPath; + + @Parameter(names = {"-spark-parallelism"}, description = "Spark Parallelism") + public Integer sparkParallelism = 100; + + @Parameter(names = {"-spark-master"}, description = "Spark Master") + public String sparkMaster = "yarn"; + + @Parameter(names = {"-spark-executor-memory"}, description = "Spark Executor Memory") + public String sparkExecutorMemory = "4g"; + + @Parameter(names = {"-spark-driver-memory"}, description = "Spark Driver Memory") + public String sparkDriverMemory = "2g"; + + @Parameter(names = {"-spark-executor-memory-overhead"}, description = "Spark Executor Memory Overhead") + public String sparkExecutorMemoryOverhead = "200m"; + + @Parameter(names = {"-spark-executor-cores"}, description = "Spark Executor Cores") + public Integer sparkExecutorCores = 1; + + @Parameter(names = {"-spark-min-executors"}, description = "Spark Min Executors") + public Integer sparkMinExecutors = 1; + + @Parameter(names = {"-spark-max-executors"}, description = "Spark Max Executors") + public Integer sparkMaxExecutors = 100; + + @Parameter(names = {"--help", "-h"}) + public Boolean help = false; + + public static HoodieTableServiceManagerConfig toTableServiceManagerConfig(CommandConfig config) { + return HoodieTableServiceManagerConfig.newBuilder() + .withScheduleIntervalMs(config.scheduleIntervalMs) + .withScheduleCoreExecuteSize(config.scheduleCoreExecuteSize) + .withScheduleMaxExecuteSize(config.scheduleMaxExecuteSize) + .withMetadataStoreClass(config.metadataStoreClass) + .withInstanceCacheEnable(config.instanceCacheEnable) + .withInstanceMaxRetryNum(config.instanceMaxRetryNum) + .withInstanceSubmitTimeoutSec(config.instanceSubmitTimeoutSec) + .withSparkSubmitJarPath(config.sparkSubmitJarPath) + .withSparkParallelism(config.sparkParallelism) + .withSparkMaster(config.sparkMaster) + .withSparkExecutorMemory(config.sparkExecutorMemory) + .withSparkDriverMemory(config.sparkDriverMemory) + .withSparkExecutorMemoryOverhead(config.sparkExecutorMemoryOverhead) + .withSparkExecutorCores(config.sparkExecutorCores) + .withSparkMinExecutors(config.sparkMinExecutors) + .withSparkMaxExecutors(config.sparkMaxExecutors) + .build(); + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/HoodieTableServiceManagerConfig.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/HoodieTableServiceManagerConfig.java new file mode 100644 index 0000000000000..6365eab7f4d38 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/HoodieTableServiceManagerConfig.java @@ -0,0 +1,293 @@ +package org.apache.hudi.table.service.manager.common; + +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +/** + * Hoodie Table Service Manager Configs. + */ +@ConfigClassProperty(name = "Hoodie table service manager configs", + groupName = ConfigGroups.Names.PLATFORM_SERVICE, + description = "Configure the execution parameters of the table service manager, submit job resources.") +public class HoodieTableServiceManagerConfig extends HoodieConfig { + private HoodieTableServiceManagerConfig() { + super(); + } + + public static final ConfigProperty SCHEDULE_INTERVAL_MS = ConfigProperty + .key("schedule.interval.ms") + .defaultValue(30000L) + .sinceVersion("0.13.0") + .withDocumentation(""); + + public static final ConfigProperty SCHEDULE_CORE_EXECUTE_SIZE = ConfigProperty + .key("schedule.core.executor.size") + .defaultValue(300) + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public static final ConfigProperty SCHEDULE_MAX_EXECUTE_SIZE = ConfigProperty + .key("schedule.max.executor.size") + .defaultValue(1000) + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public static final ConfigProperty METADATA_STORE_CLASS = ConfigProperty + .key("metadata.store.class") + .defaultValue("org.apache.hudi.table.service.manager.store.impl.RelationDBBasedStore") + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public static final ConfigProperty INSTANCE_CACHE_ENABLE = ConfigProperty + .key("instance.cache.enable") + .defaultValue(true) + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public static final ConfigProperty INSTANCE_MAX_RETRY_NUM = ConfigProperty + .key("instance.max.retry.num") + .defaultValue(3) + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public static final ConfigProperty INSTANCE_SUBMIT_TIMEOUT_SEC = ConfigProperty + .key("instance.submit.timeout.seconds") + .defaultValue(600) + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + /** + * Spark Submit Config. + */ + + public static final ConfigProperty SPARK_SUBMIT_JAR_PATH = ConfigProperty + .key("spark.submit.jar.path") + .noDefaultValue() + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public static final ConfigProperty SPARK_PARALLELISM = ConfigProperty + .key("spark.parallelism") + .defaultValue(100) + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public static final ConfigProperty SPARK_MASTER = ConfigProperty + .key("spark.master") + .defaultValue("yarn") + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public static final ConfigProperty SPARK_EXECUTOR_MEMORY = ConfigProperty + .key("spark.executor.memory") + .defaultValue("4g") + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public static final ConfigProperty SPARK_DRIVER_MEMORY = ConfigProperty + .key("spark.driver.memory") + .defaultValue("2g") + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public static final ConfigProperty SPARK_EXECUTOR_MEMORY_OVERHEAD = ConfigProperty + .key("spark.executor.memory.overhead") + .defaultValue("200m") + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public static final ConfigProperty SPARK_EXECUTOR_CORES = ConfigProperty + .key("spark.executor.cores") + .defaultValue(1) + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public static final ConfigProperty SPARK_MIN_EXECUTORS = ConfigProperty + .key("spark.min.executors") + .defaultValue(1) + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public static final ConfigProperty SPARK_MAX_EXECUTORS = ConfigProperty + .key("spark.max.executors") + .defaultValue(100) + .sinceVersion("0.13.0") + .withDocumentation("AWS session token"); + + public Long getScheduleIntervalMs() { + return getLong(SCHEDULE_INTERVAL_MS); + } + + public int getScheduleCoreExecuteSize() { + return getInt(SCHEDULE_CORE_EXECUTE_SIZE); + } + + public int getScheduleMaxExecuteSize() { + return getInt(SCHEDULE_MAX_EXECUTE_SIZE); + } + + public String getMetadataStoreClass() { + return getString(METADATA_STORE_CLASS); + } + + public boolean getInstanceCacheEnable() { + return getBoolean(INSTANCE_CACHE_ENABLE); + } + + public int getInstanceMaxRetryNum() { + return getInt(INSTANCE_MAX_RETRY_NUM); + } + + public int getInstanceSubmitTimeoutSec() { + return getInt(INSTANCE_SUBMIT_TIMEOUT_SEC); + } + + public String getSparkSubmitJarPath() { + return getString(SPARK_SUBMIT_JAR_PATH); + } + + public int getSparkParallelism() { + return getInt(SPARK_PARALLELISM); + } + + public String getSparkMaster() { + return getString(SPARK_MASTER); + } + + public String getSparkExecutorMemory() { + return getString(SPARK_EXECUTOR_MEMORY); + } + + public String getSparkDriverMemory() { + return getString(SPARK_DRIVER_MEMORY); + } + + public String getSparkExecutorMemoryOverhead() { + return getString(SPARK_EXECUTOR_MEMORY_OVERHEAD); + } + + public int getSparkExecutorCores() { + return getInt(SPARK_EXECUTOR_CORES); + } + + public int getSparkMinExecutors() { + return getInt(SPARK_MIN_EXECUTORS); + } + + public int getSparkMaxExecutors() { + return getInt(SPARK_MAX_EXECUTORS); + } + + public static HoodieTableServiceManagerConfig.Builder newBuilder() { + return new HoodieTableServiceManagerConfig.Builder(); + } + + public static class Builder { + + private final HoodieTableServiceManagerConfig tableServiceManagerConfig = new HoodieTableServiceManagerConfig(); + + public HoodieTableServiceManagerConfig.Builder fromFile(File propertiesFile) throws IOException { + try (FileReader reader = new FileReader(propertiesFile)) { + this.tableServiceManagerConfig.getProps().load(reader); + return this; + } + } + + public HoodieTableServiceManagerConfig.Builder fromProperties(Properties props) { + this.tableServiceManagerConfig.getProps().putAll(props); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withScheduleIntervalMs(long scheduleIntervalMs) { + tableServiceManagerConfig.setValue(SCHEDULE_INTERVAL_MS, String.valueOf(scheduleIntervalMs)); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withScheduleCoreExecuteSize(int scheduleCoreExecuteSize) { + tableServiceManagerConfig.setValue(SCHEDULE_CORE_EXECUTE_SIZE, String.valueOf(scheduleCoreExecuteSize)); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withScheduleMaxExecuteSize(int scheduleMaxExecuteSize) { + tableServiceManagerConfig.setValue(SCHEDULE_MAX_EXECUTE_SIZE, String.valueOf(scheduleMaxExecuteSize)); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withMetadataStoreClass(String metadataStoreClass) { + tableServiceManagerConfig.setValue(METADATA_STORE_CLASS, metadataStoreClass); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withInstanceCacheEnable(boolean instanceCacheEnable) { + tableServiceManagerConfig.setValue(INSTANCE_CACHE_ENABLE, String.valueOf(instanceCacheEnable)); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withInstanceMaxRetryNum(int instanceMaxRetryNum) { + tableServiceManagerConfig.setValue(INSTANCE_MAX_RETRY_NUM, String.valueOf(instanceMaxRetryNum)); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withInstanceSubmitTimeoutSec(int instanceSubmitTimeoutSec) { + tableServiceManagerConfig.setValue(INSTANCE_SUBMIT_TIMEOUT_SEC, String.valueOf(instanceSubmitTimeoutSec)); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withSparkSubmitJarPath(String sparkSubmitJarPath) { + tableServiceManagerConfig.setValue(SPARK_SUBMIT_JAR_PATH, String.valueOf(sparkSubmitJarPath)); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withSparkParallelism(int sparkParallelism) { + tableServiceManagerConfig.setValue(SPARK_PARALLELISM, String.valueOf(sparkParallelism)); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withSparkMaster(String sparkMaster) { + tableServiceManagerConfig.setValue(SPARK_MASTER, sparkMaster); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withSparkExecutorMemory(String sparkExecutorMemory) { + tableServiceManagerConfig.setValue(SPARK_EXECUTOR_MEMORY, sparkExecutorMemory); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withSparkDriverMemory(String sparkDriverMemory) { + tableServiceManagerConfig.setValue(SPARK_DRIVER_MEMORY, sparkDriverMemory); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withSparkExecutorMemoryOverhead(String sparkExecutorMemoryOverhead) { + tableServiceManagerConfig.setValue(SPARK_EXECUTOR_MEMORY_OVERHEAD, sparkExecutorMemoryOverhead); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withSparkExecutorCores(int sparkExecutorCores) { + tableServiceManagerConfig.setValue(SPARK_EXECUTOR_CORES, String.valueOf(sparkExecutorCores)); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withSparkMinExecutors(int sparkMinExecutors) { + tableServiceManagerConfig.setValue(SPARK_MIN_EXECUTORS, String.valueOf(sparkMinExecutors)); + return this; + } + + public HoodieTableServiceManagerConfig.Builder withSparkMaxExecutors(int sparkMaxExecutors) { + tableServiceManagerConfig.setValue(SPARK_MAX_EXECUTORS, String.valueOf(sparkMaxExecutors)); + return this; + } + + public HoodieTableServiceManagerConfig build() { + tableServiceManagerConfig.setDefaults(HoodieTableServiceManagerConfig.class.getName()); + return tableServiceManagerConfig; + } + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/ServiceConfig.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/ServiceConfig.java new file mode 100644 index 0000000000000..0c059f5001789 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/ServiceConfig.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.common; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.Properties; + +public class ServiceConfig extends Properties { + + private static Logger LOG = LogManager.getLogger(ServiceConfig.class); + private static final String HOODIE_ENV_PROPS_PREFIX = "hoodie_table_management_"; + + private static ServiceConfig CONFIG = new ServiceConfig(); + + /** + * Constructor. + */ + private ServiceConfig() { + LOG.info("Start init ServiceConfig"); + Map envs = System.getenv(); + for (Map.Entry env : envs.entrySet()) { + if (env.getKey().toLowerCase().startsWith(HOODIE_ENV_PROPS_PREFIX)) { + String key = env.getKey().toLowerCase().replaceAll("_", "."); + String value = env.getValue().trim(); + setProperty(key, value); + LOG.info("Set property " + key + " to " + value); + } + } + LOG.info("Finish init ServiceConfig"); + } + + public String getString(ServiceConfVars confVars) { + return this.getProperty(confVars.key(), confVars.defVal()); + } + + public void setString(ServiceConfVars confVars, String value) { + this.setProperty(confVars.key(), value); + } + + public Boolean getBool(ServiceConfVars confVars) { + return Boolean.valueOf(this.getProperty(confVars.key(), confVars.defVal())); + } + + public int getInt(ServiceConfVars confVars) { + return Integer.parseInt(this.getProperty(confVars.key(), confVars.defVal())); + } + + public static ServiceConfig getInstance() { + return CONFIG; + } + + public enum ServiceConfVars { + CompactionScheduleWaitInterval("hoodie.table.management.schedule.wait.interval", "30000"), + MaxFailTolerance("hoodie.table.management.max.fail.tolerance", "5"), + MaxRetryNum("hoodie.table.management.instance.max.retry", "3"), + MetadataStoreClass("hoodie.table.management.metadata.store.class", + "org.apache.hudi.table.management.store.impl.RelationDBBasedStore"), + CompactionCacheEnable("hoodie.table.management.compaction.cache.enable", "true"), + SubmitJobTimeoutSec("hoodie.table.management.instance.submit.timeout.seconds", "600"), + SparkSubmitJarPath("hoodie.table.management.submit.jar.path", "/tmp/hoodie_submit_jar/spark/"), + SparkParallelism("hoodie.table.management.spark.parallelism", "100"), + SparkMaster("hoodie.table.management.spark.master", "yarn"), + SparkSpeculation("hoodie.table.management.spark.speculation", "false"), + ExecutorMemory("hoodie.table.management.executor.memory", "4g"), + DriverMemory("hoodie.table.management.driver.memory", "2g"), + ExecutorMemoryOverhead("hoodie.table.management.executor.memory.overhead", "200m"), + ExecutorCores("hoodie.table.management.executor.cores", "1"), + MinExecutors("hoodie.table.management.min.executors", "1"), + MaxExecutors("hoodie.table.management.max.executors", "10"), + CoreExecuteSize("hoodie.table.management.core.executor.pool.size", "300"), + MaxExecuteSize("hoodie.table.management.max.executor.pool.size", "1000"); + + private final String key; + private final String defaultVal; + + ServiceConfVars(String key, String defaultVal) { + this.key = key; + this.defaultVal = defaultVal; + } + + public String key() { + return this.key; + } + + public String defVal() { + return this.defaultVal; + } + } + +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/ServiceContext.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/ServiceContext.java new file mode 100644 index 0000000000000..45daf1f376355 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/common/ServiceContext.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.common; + +import org.apache.hudi.table.service.manager.store.jdbc.JdbcMapper; +import org.apache.hudi.table.service.manager.store.impl.InstanceService; + +import org.apache.hadoop.yarn.client.api.YarnClient; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class ServiceContext { + + private static ConcurrentHashMap runningInstance = new ConcurrentHashMap<>(); + + public static void addRunningInstance(String instanceIdentifier, String threadIdentifier) { + runningInstance.put(instanceIdentifier, threadIdentifier); + } + + public static void removeRunningInstance(String instanceIdentifier) { + runningInstance.remove(instanceIdentifier); + } + + public static int getRunningInstanceNum() { + return runningInstance.size(); + } + + public static List getRunningInstanceInfo() { + List runningInfos = new ArrayList<>(); + for (Map.Entry instance : runningInstance.entrySet()) { + runningInfos.add("instance " + instance.getKey() + " execution on " + instance.getValue()); + } + return runningInfos; + } + + private static ConcurrentHashMap pendingInstances = new ConcurrentHashMap<>(); + + public static boolean containsPendingInstant(String key) { + return pendingInstances.containsKey(key); + } + + public static void refreshPendingInstant(String key) { + pendingInstances.put(key, System.currentTimeMillis()); + } + + public static void removePendingInstant(String key) { + pendingInstances.remove(key); + } + + public static ConcurrentHashMap getPendingInstances() { + return pendingInstances; + } + + public static InstanceService getInstanceDao() { + return ServiceContextHolder.INSTANCE_DAO; + } + + public static YarnClient getYarnClient() { + return ServiceContextHolder.YARN_CLIENT; + } + + public static JdbcMapper getJdbcMapper() { + return ServiceContextHolder.JDBC_MAPPER; + } + + private static class ServiceContextHolder { + // Make sure we have the jdbc driver in classpath + private static final String DRIVER_NAME = "com.mysql.jdbc.Driver"; + + static { + try { + Class.forName(DRIVER_NAME); + } catch (ClassNotFoundException e) { + throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e); + } + } + private static final JdbcMapper JDBC_MAPPER = new JdbcMapper(); + private static final InstanceService INSTANCE_DAO = new InstanceService(); + private static final YarnClient YARN_CLIENT = YarnClient.createYarnClient(); + } + +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/Action.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/Action.java new file mode 100644 index 0000000000000..b7961d5a72285 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/Action.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.entity; + +public enum Action { + COMPACTION(0), + CLUSTERING(1), + CLEAN(2); + + private final int value; + + Action(int value) { + this.value = value; + } + + public int getValue() { + return this.value; + } + + public static void checkActionType(Instance instance) { + for (Action action : Action.values()) { + if (action.getValue() == instance.getAction()) { + return; + } + } + throw new RuntimeException("Invalid action type: " + instance); + } + + public static Action getAction(int actionValue) { + for (Action action : Action.values()) { + if (action.getValue() == actionValue) { + return action; + } + } + throw new RuntimeException("Invalid instance action: " + actionValue); + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/AssistQueryEntity.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/AssistQueryEntity.java new file mode 100644 index 0000000000000..c9471a2725989 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/AssistQueryEntity.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.entity; + +import org.apache.hudi.table.service.manager.common.ServiceConfig; +import org.apache.hudi.table.service.manager.util.DateTimeUtils; + +import lombok.Getter; + +import java.util.Date; + +@Getter +public class AssistQueryEntity { + + private int maxRetry; + + private Date queryStartTime = DateTimeUtils.addDay(-3); + + private int status; + + public AssistQueryEntity(int maxRetry) { + this.maxRetry = maxRetry; + } + + public AssistQueryEntity(int status, Date queryStartTime) { + this.status = status; + this.queryStartTime = queryStartTime; + } + +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/Engine.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/Engine.java new file mode 100644 index 0000000000000..ad9d789f741d8 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/Engine.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.entity; + +public enum Engine { + + SPARK(0), + FLINK(1); + + private final int value; + + Engine(int value) { + this.value = value; + } + + public int getValue() { + return this.value; + } + + public static void checkEngineType(Instance instance) { + for (Engine engine : Engine.values()) { + if (engine.equals(instance.getExecutionEngine())) { + return; + } + } + throw new RuntimeException("Invalid engine type: " + instance); + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/Instance.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/Instance.java new file mode 100644 index 0000000000000..32e9725cc3681 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/Instance.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.entity; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +import java.util.Date; + +@Builder +@Getter +@Setter +@ToString +@NoArgsConstructor +@AllArgsConstructor +public class Instance { + + private long id; + + private String dbName; + + private String tableName; + + private String basePath; + + private Engine executionEngine; + + private String userName; + + private String queue; + + private String resource; + + private String parallelism; + + private String instant; + + private int action; + + private int status; + + private int runTimes; + + private String applicationId; + + private Date scheduleTime; + + private Date createTime; + + private Date updateTime; + + private boolean isDeleted; + + public String getIdentifier() { + return dbName + "." + tableName + "." + instant + "." + status; + } + + public String getInstanceRunStatus() { + return dbName + "." + tableName + "." + instant + "." + status + "." + + runTimes + "." + updateTime; + } + + public String getRecordKey() { + return dbName + "." + tableName + "." + instant; + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/InstanceStatus.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/InstanceStatus.java new file mode 100644 index 0000000000000..d4210532d048d --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/entity/InstanceStatus.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.entity; + +public enum InstanceStatus { + + SCHEDULED(0, "scheduled"), + RUNNING(1, "running"), + FAILED(2, "failed"), + INVALID(3, "invalid"), + COMPLETED(4, "completed"); + + private int status; + private String desc; + + InstanceStatus(int status, String desc) { + this.status = status; + this.desc = desc; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public String getDesc() { + return desc; + } + + public void setDesc(String desc) { + this.desc = desc; + } + + public static InstanceStatus getInstance(int status) { + for (InstanceStatus instanceStatus : InstanceStatus.values()) { + if (instanceStatus.getStatus() == status) { + return instanceStatus; + } + } + throw new RuntimeException("Invalid instance status: " + status); + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/exception/HoodieTableServiceManagerException.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/exception/HoodieTableServiceManagerException.java new file mode 100644 index 0000000000000..5d0e5c65b66fb --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/exception/HoodieTableServiceManagerException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.exception; + +import org.apache.hudi.exception.HoodieException; + +public class HoodieTableServiceManagerException extends HoodieException { + + public HoodieTableServiceManagerException(String msg) { + super(msg); + } + + public HoodieTableServiceManagerException(String msg, Throwable e) { + super(msg, e); + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/BaseActionExecutor.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/BaseActionExecutor.java new file mode 100644 index 0000000000000..21e28e444bcf7 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/BaseActionExecutor.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.executor; + +import org.apache.hudi.table.service.manager.common.HoodieTableServiceManagerConfig; +import org.apache.hudi.table.service.manager.common.ServiceConfig; +import org.apache.hudi.table.service.manager.common.ServiceContext; +import org.apache.hudi.table.service.manager.entity.Instance; +import org.apache.hudi.table.service.manager.executor.submitter.ExecutionEngine; +import org.apache.hudi.table.service.manager.executor.submitter.SparkEngine; +import org.apache.hudi.table.service.manager.store.impl.InstanceService; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public abstract class BaseActionExecutor implements Runnable { + + private static final Logger LOG = LogManager.getLogger(BaseActionExecutor.class); + + protected InstanceService instanceDao; + protected Instance instance; + protected ExecutionEngine engine; + protected HoodieTableServiceManagerConfig config; + + public BaseActionExecutor(Instance instance, HoodieTableServiceManagerConfig config) { + this.instance = instance; + this.config = config; + this.instanceDao = ServiceContext.getInstanceDao(); + switch (instance.getExecutionEngine()) { + case SPARK: + engine = new SparkEngine(instanceDao, config); + break; + case FLINK: + default: + throw new IllegalStateException("Unexpected value: " + instance.getExecutionEngine()); + } + } + + @Override + public void run() { + ServiceContext.addRunningInstance(instance.getRecordKey(), getThreadIdentifier()); + try { + doExecute(); + } finally { + ServiceContext.removeRunningInstance(instance.getRecordKey()); + if (config.getInstanceCacheEnable()) { + ServiceContext.removePendingInstant(instance.getRecordKey()); + } + } + } + + public abstract void doExecute(); + + public abstract String getJobName(Instance instance); + + public String getThreadIdentifier() { + return Thread.currentThread().getId() + "." + Thread.currentThread().getName() + "." + + Thread.currentThread().getState(); + } + + @Override + public String toString() { + return this.getClass().getName() + ", instance: " + instance.getIdentifier(); + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/CompactionExecutor.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/CompactionExecutor.java new file mode 100644 index 0000000000000..ecb7bc5eeaed3 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/CompactionExecutor.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.executor; + +import org.apache.hudi.table.service.manager.common.HoodieTableServiceManagerConfig; +import org.apache.hudi.table.service.manager.entity.Instance; +import org.apache.hudi.table.service.manager.entity.InstanceStatus; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class CompactionExecutor extends BaseActionExecutor { + + private static final Logger LOG = LogManager.getLogger(CompactionExecutor.class); + + public static final String COMPACT_JOB_NAME = "Hoodie compact %s.%s %s"; + + public CompactionExecutor(Instance instance, HoodieTableServiceManagerConfig config) { + super(instance, config); + } + + @Override + public void doExecute() { + String jobName = getJobName(instance); + LOG.info("Start exec : " + jobName); + instance.setStatus(InstanceStatus.RUNNING.getStatus()); + instanceDao.updateStatus(instance); + engine.execute(jobName, instance); + } + + @Override + public String getJobName(Instance instance) { + return String.format(COMPACT_JOB_NAME, instance.getDbName(), instance.getTableName(), + instance.getInstant()); + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/submitter/ExecutionEngine.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/submitter/ExecutionEngine.java new file mode 100644 index 0000000000000..cff52053dd494 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/submitter/ExecutionEngine.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.executor.submitter; + +import org.apache.hudi.table.service.manager.common.HoodieTableServiceManagerConfig; +import org.apache.hudi.table.service.manager.entity.Instance; +import org.apache.hudi.table.service.manager.exception.HoodieTableServiceManagerException; +import org.apache.hudi.table.service.manager.store.impl.InstanceService; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +public abstract class ExecutionEngine { + + private static final Logger LOG = LogManager.getLogger(ExecutionEngine.class); + + protected final InstanceService instanceDao; + protected final HoodieTableServiceManagerConfig config; + + public ExecutionEngine(InstanceService instanceDao, HoodieTableServiceManagerConfig config) { + this.instanceDao = instanceDao; + this.config = config; + } + + public void execute(String jobName, Instance instance) throws HoodieTableServiceManagerException { + try { + LOG.info("Submitting instance {}:{}", jobName, instance.getIdentifier()); + launchJob(jobName, instance); + } catch (Exception e) { + throw new HoodieTableServiceManagerException("Failed submit instance " + instance.getIdentifier(), e); + } + } + + public abstract void launchJob(String jobName, Instance instance); + + public abstract Map getJobParams(Instance instance); +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/submitter/SparkEngine.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/submitter/SparkEngine.java new file mode 100644 index 0000000000000..b88ddb5f713f0 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/executor/submitter/SparkEngine.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.executor.submitter; + +import org.apache.hudi.cli.commands.SparkMain; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.table.service.manager.common.HoodieTableServiceManagerConfig; +import org.apache.hudi.table.service.manager.common.ServiceConfig; +import org.apache.hudi.table.service.manager.entity.Instance; +import org.apache.hudi.table.service.manager.entity.InstanceStatus; +import org.apache.hudi.table.service.manager.exception.HoodieTableServiceManagerException; +import org.apache.hudi.table.service.manager.store.impl.InstanceService; + +import org.apache.spark.launcher.SparkAppHandle; +import org.apache.spark.launcher.SparkLauncher; +import org.apache.spark.util.Utils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.File; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import static org.apache.spark.launcher.SparkAppHandle.State.FINISHED; +import static org.apache.spark.launcher.SparkAppHandle.State.SUBMITTED; + +public class SparkEngine extends ExecutionEngine { + + private static final Logger LOG = LogManager.getLogger(SparkEngine.class); + + public SparkEngine(InstanceService instanceDao, HoodieTableServiceManagerConfig config) { + super(instanceDao, config); + } + + @Override + public Map getJobParams(Instance instance) { + Map sparkParams = new HashMap<>(); + String parallelism = StringUtils.isNullOrEmpty(instance.getParallelism()) + ? String.valueOf(config.getSparkMaxExecutors()) + : instance.getParallelism(); + + sparkParams.put("spark.dynamicAllocation.maxExecutors", parallelism); + sparkParams.put("spark.dynamicAllocation.minExecutors", String.valueOf(config.getSparkMinExecutors())); + sparkParams.put("spark.speculation", "false"); + String driverResource; + String executorResource; + String resource = instance.getResource().trim(); + + if (StringUtils.isNullOrEmpty(resource)) { + driverResource = config.getSparkDriverMemory(); + executorResource = config.getSparkExecutorMemory(); + } else { + String[] resourceArray = resource.split(":"); + if (resourceArray.length == 1) { + driverResource = resourceArray[0]; + executorResource = resourceArray[0]; + } else if (resourceArray.length == 2) { + driverResource = resourceArray[0]; + executorResource = resourceArray[1]; + } else { + throw new RuntimeException( + "Invalid conf: " + instance.getIdentifier() + ", resource: " + resource); + } + } + + sparkParams.put("spark.executor.cores", String.valueOf(config.getSparkExecutorCores())); + sparkParams.put("spark.executor.memory", executorResource); + sparkParams.put("spark.driver.memory", driverResource); + sparkParams.put("spark.executor.memoryOverhead", config.getSparkExecutorMemoryOverhead()); + + return sparkParams; + } + + @Override + public void launchJob(String jobName, Instance instance) throws HoodieTableServiceManagerException { + String sparkPropertiesPath = + Utils.getDefaultPropertiesFile(scala.collection.JavaConversions.propertiesAsScalaMap(System.getProperties())); + SparkLauncher sparkLauncher; + try { + sparkLauncher = initLauncher(sparkPropertiesPath, instance); + } catch (URISyntaxException e) { + LOG.error("Failed to init spark launcher"); + throw new HoodieTableServiceManagerException("Failed to init spark launcher", e); + } + + try { + final boolean[] isFinished = new boolean[1]; + SparkAppHandle sparkAppHandle = sparkLauncher.startApplication(new SparkAppHandle.Listener() { + @Override + public void stateChanged(SparkAppHandle handle) { + LOG.info("****************************"); + LOG.info("State Changed [state={}]", handle.getState()); + LOG.info("AppId={}", handle.getAppId()); + + if (handle.getState() == SUBMITTED) { + LOG.info("Submit job in application id: " + handle.getAppId()); + instance.setApplicationId(handle.getAppId()); + instanceDao.updateExecutionInfo(instance); + } else if (isCompleted(handle.getState())) { + isFinished[0] = true; + LOG.info("Completed job in state: " + handle.getState()); + if (handle.getState() == FINISHED) { + instance.setStatus(InstanceStatus.COMPLETED.getStatus()); + } else { + instance.setStatus(InstanceStatus.FAILED.getStatus()); + } + instanceDao.updateStatus(instance); + } + } + + @Override + public void infoChanged(SparkAppHandle handle) { + // no OP + } + }); + + while (!isFinished[0]) { + TimeUnit.SECONDS.sleep(5); + LOG.info("Waiting for job {} finished.", jobName); + } + + LOG.info("Stop job when job is finished."); + sparkAppHandle.kill(); + } catch (Throwable e) { + LOG.error("Failed to launcher spark process"); + throw new HoodieTableServiceManagerException("Failed to init spark launcher", e); + } + } + + private boolean isCompleted(SparkAppHandle.State state) { + switch (state) { + case FINISHED: + case FAILED: + case KILLED: + case LOST: + return true; + } + return false; + } + + private SparkLauncher initLauncher(String propertiesFile, Instance instance) throws URISyntaxException { + String currentJar = StringUtils.isNullOrEmpty(config.getSparkSubmitJarPath()) + ? config.getSparkSubmitJarPath() + : SparkEngine.class.getProtectionDomain().getCodeSource().getLocation().getFile(); + System.out.println("currentJar = " + currentJar); + Map env = System.getenv(); + String master = config.getSparkMaster(); + + SparkLauncher sparkLauncher = + new SparkLauncher(env) + .setDeployMode("cluster") + .setMaster(master) + .setAppResource(currentJar) + .setMainClass(SparkMain.class.getName()); + + if (!StringUtils.isNullOrEmpty(propertiesFile)) { + sparkLauncher.setPropertiesFile(propertiesFile); + } + + File libDirectory = new File(new File(currentJar).getParent(), "lib"); + // This lib directory may be not required, such as providing libraries through a bundle jar + if (libDirectory.exists()) { + Arrays.stream(Objects.requireNonNull(libDirectory.list())).forEach(library -> { + if (!library.startsWith("hadoop-hdfs")) { + sparkLauncher.addJar(new File(libDirectory, library).getAbsolutePath()); + } + }); + } + + Map jobParams = getJobParams(instance); + + for (Map.Entry entry : jobParams.entrySet()) { + sparkLauncher.setConf(entry.getKey(), entry.getValue()); + } + + sparkLauncher.addSparkArg("--queue", instance.getQueue()); + String sparkMemory = jobParams.get("spark.executor.memory"); + String parallelism = String.valueOf(config.getSparkParallelism()); + String maxRetryNum = String.valueOf(config.getInstanceMaxRetryNum()); + + // sparkLauncher.addAppArgs(SparkCommand.COMPACT_RUN.toString(), master, sparkMemory, client.getBasePath(), + // client.getTableConfig().getTableName(), compactionInstantTime, parallelism, schemaFilePath, + // retry, propsFilePath); + sparkLauncher.addAppArgs("COMPACT_RUN", master, sparkMemory, instance.getBasePath(), + instance.getTableName(), instance.getInstant(), parallelism, "", maxRetryNum, ""); + + return sparkLauncher; + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/handlers/ActionHandler.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/handlers/ActionHandler.java new file mode 100644 index 0000000000000..ee12f86a911ce --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/handlers/ActionHandler.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.handlers; + +import org.apache.hudi.table.service.manager.entity.Instance; +import org.apache.hudi.table.service.manager.store.MetadataStore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ActionHandler implements AutoCloseable { + private static Logger LOG = LogManager.getLogger(ActionHandler.class); + + protected final Configuration conf; + protected final MetadataStore metadataStore; + + private final CompactionHandler compactionHandler; + + public ActionHandler(Configuration conf, + MetadataStore metadataStore) { + this.conf = conf; + this.metadataStore = metadataStore; + boolean cacheEnable = metadataStore.getTableServiceManagerConfig().getInstanceCacheEnable(); + this.compactionHandler = new CompactionHandler(cacheEnable); + } + + public void scheduleCompaction(Instance instance) { + compactionHandler.scheduleCompaction(metadataStore, instance); + } + + // TODO: support clustering + public void scheduleClustering(Instance instance) { + + } + + @Override + public void close() throws Exception { + + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/handlers/CompactionHandler.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/handlers/CompactionHandler.java new file mode 100644 index 0000000000000..97a6c58c90fb6 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/handlers/CompactionHandler.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.handlers; + +import org.apache.hudi.table.service.manager.common.ServiceContext; +import org.apache.hudi.table.service.manager.entity.Instance; +import org.apache.hudi.table.service.manager.store.MetadataStore; + +import org.jetbrains.annotations.NotNull; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * REST Handler servicing compaction requests. + */ +public class CompactionHandler { + private static Logger LOG = LogManager.getLogger(CompactionHandler.class); + protected boolean cacheEnable; + + public CompactionHandler(boolean cacheEnable) { + this.cacheEnable = cacheEnable; + } + + public void scheduleCompaction(MetadataStore metadataStore, + Instance instance) { + String recordKey = instance.getRecordKey(); + LOG.info("Start register compaction instance: " + recordKey); + if ((cacheEnable && ServiceContext.containsPendingInstant(recordKey)) + || metadataStore.getInstance(instance) != null) { + LOG.warn("Instance has existed, instance: " + instance); + } else { + metadataStore.saveInstance(instance); + } + if (cacheEnable) { + ServiceContext.refreshPendingInstant(recordKey); + } + } + + public void removeCompaction(@NotNull MetadataStore metadataStore, + Instance instance) { + LOG.info("Start remove compaction instance: " + instance.getIdentifier()); + // 1. check instance exist + Instance result = metadataStore.getInstance(instance); + if (result == null) { + throw new RuntimeException("Instance not exist: " + instance); + } + // 2. update status + metadataStore.updateStatus(instance); + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/BaseService.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/BaseService.java new file mode 100644 index 0000000000000..d4581e42fbddd --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/BaseService.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.service; + +public interface BaseService { + + void init(); + + void startService(); + + void stop(); + +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/CleanService.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/CleanService.java new file mode 100644 index 0000000000000..6fb70aad76442 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/CleanService.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.service; + +import org.apache.hudi.table.service.manager.common.ServiceContext; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class CleanService implements BaseService { + + private static final Logger LOG = LogManager.getLogger(CleanService.class); + private ScheduledExecutorService service; + private long cacheInterval = 3600 * 1000; //ms + + @Override + public void init() { + LOG.info("Init service: " + CleanService.class.getName()); + this.service = Executors.newSingleThreadScheduledExecutor(); + } + + @Override + public void startService() { + LOG.info("Start service: " + CleanService.class.getName()); + service.scheduleAtFixedRate(new RetryRunnable(), 30, 300, TimeUnit.SECONDS); + } + + @Override + public void stop() { + LOG.info("Stop service: " + CleanService.class.getName()); + if (service != null && !service.isShutdown()) { + service.shutdown(); + } + } + + private class RetryRunnable implements Runnable { + + @Override + public void run() { + cleanCache(); + } + } + + private void cleanCache() { + long currentTime = System.currentTimeMillis(); + ConcurrentHashMap pendingInstances = ServiceContext.getPendingInstances(); + for (Map.Entry instance : pendingInstances.entrySet()) { + if (currentTime - instance.getValue() > cacheInterval) { + LOG.info("Instance has expired: " + instance.getKey()); + pendingInstances.remove(instance.getKey()); + } + } + } + +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/ExecutorService.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/ExecutorService.java new file mode 100644 index 0000000000000..b2c62d2e98d86 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/ExecutorService.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.service; + +import org.apache.hudi.table.service.manager.common.ServiceContext; +import org.apache.hudi.table.service.manager.executor.BaseActionExecutor; +import org.apache.hudi.table.service.manager.store.MetadataStore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ExecutorService implements BaseService { + + private static final Logger LOG = LogManager.getLogger(ExecutorService.class); + + private ThreadPoolExecutor executorService; + private ScheduledExecutorService service; + private BlockingQueue taskQueue; + private final MetadataStore metadataStore; + + public ExecutorService(MetadataStore metadataStore) { + this.metadataStore = metadataStore; + } + + public void init() { + service = Executors.newSingleThreadScheduledExecutor(); + int coreExecuteSize = metadataStore.getTableServiceManagerConfig().getScheduleCoreExecuteSize(); + int maxExecuteSize = metadataStore.getTableServiceManagerConfig().getScheduleMaxExecuteSize(); + executorService = new ThreadPoolExecutor(coreExecuteSize, maxExecuteSize, 60, + TimeUnit.SECONDS, new SynchronousQueue<>()); + taskQueue = new LinkedBlockingQueue<>(); + LOG.info("Init service: " + ExecutorService.class.getName() + ", coreExecuteSize: " + + coreExecuteSize + ", maxExecuteSize: " + maxExecuteSize); + } + + @Override + public void startService() { + LOG.info("Start service: " + ExecutorService.class.getName()); + service.submit(new ExecutionTask()); + } + + @Override + public void stop() { + LOG.info("Stop service: " + ExecutorService.class.getName()); + if (executorService != null && !executorService.isShutdown()) { + executorService.shutdown(); + } + if (service != null && service.isShutdown()) { + service.shutdown(); + } + LOG.info("Finish stop service: " + ExecutorService.class.getName()); + } + + private class ExecutionTask implements Runnable { + + @Override + public void run() { + while (true) { + try { + System.out.println("SPARK_HOME = " + System.getenv("SPARK_HOME")); + BaseActionExecutor executor = taskQueue.take(); + LOG.info("Start execute: " + executor); + executorService.execute(executor); + } catch (InterruptedException interruptedException) { + LOG.error("Occur exception when exec job: " + interruptedException); + } + } + } + } + + public void submitTask(BaseActionExecutor task) { + taskQueue.add(task); + } + + public int getFreeSize() { + return metadataStore.getTableServiceManagerConfig().getScheduleMaxExecuteSize() - ServiceContext.getRunningInstanceNum(); + } + +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/MonitorService.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/MonitorService.java new file mode 100644 index 0000000000000..84e69489e1d92 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/MonitorService.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.service; + +import org.apache.hudi.table.service.manager.common.ServiceContext; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class MonitorService implements BaseService { + + private static final Logger LOG = LogManager.getLogger(MonitorService.class); + + private ScheduledExecutorService service; + + @Override + public void init() { + LOG.info("Init service: " + MonitorService.class); + this.service = Executors.newSingleThreadScheduledExecutor(); + } + + @Override + public void startService() { + LOG.info("Start service: " + MonitorService.class.getName()); + service.scheduleAtFixedRate(new MonitorRunnable(), 30, 180, TimeUnit.SECONDS); + } + + @Override + public void stop() { + LOG.info("Stop service: " + MonitorService.class.getName()); + if (service != null && !service.isShutdown()) { + service.shutdown(); + } + } + + private class MonitorRunnable implements Runnable { + + @Override + public void run() { + for (String info : ServiceContext.getRunningInstanceInfo()) { + LOG.info(info); + } + } + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/RestoreService.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/RestoreService.java new file mode 100644 index 0000000000000..dd696b40b427c --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/RestoreService.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.service; + +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.table.service.manager.common.ServiceContext; +import org.apache.hudi.table.service.manager.entity.Instance; +import org.apache.hudi.table.service.manager.entity.InstanceStatus; +import org.apache.hudi.table.service.manager.store.MetadataStore; +import org.apache.hudi.table.service.manager.util.DateTimeUtils; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Date; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class RestoreService implements BaseService { + + private static final Logger LOG = LogManager.getLogger(RestoreService.class); + + private final MetadataStore metadataStore; + private ScheduledExecutorService service; + private final int submitJobTimeoutSec; + private YarnClient yarnClient; + + public RestoreService(MetadataStore metadataStore) { + this.metadataStore = metadataStore; + this.submitJobTimeoutSec = metadataStore.getTableServiceManagerConfig().getInstanceSubmitTimeoutSec(); + } + + @Override + public void init() { + LOG.info("Init service: " + this.getClass().getName()); + this.service = Executors.newSingleThreadScheduledExecutor(); + this.yarnClient = ServiceContext.getYarnClient(); + this.yarnClient.init(new YarnConfiguration()); + this.yarnClient.start(); + LOG.info("Finish init service: " + this.getClass().getName()); + } + + @Override + public void startService() { + service.scheduleAtFixedRate(new RestoreRunnable(), 30, 360, TimeUnit.SECONDS); + LOG.info("Finish start service: " + this.getClass().getName()); + } + + @Override + public void stop() { + LOG.info("Stop service: " + this.getClass().getName()); + if (service != null && !service.isShutdown()) { + service.shutdown(); + } + LOG.info("Finish stop service: " + this.getClass().getName()); + } + + private class RestoreRunnable implements Runnable { + + @Override + public void run() { + try { + LOG.info("Start process restore service"); + restoreRunningInstances(); + LOG.info("Finish process restore service"); + } catch (Throwable e) { + LOG.error("Fail process restore service", e); + } + } + } + + private void restoreRunningInstances() { + Date curTime = new Date(); + List runningInstances = metadataStore.getInstances(InstanceStatus.RUNNING.getStatus(), -1); + + for (Instance instance : runningInstances) { + String applicationId = instance.getApplicationId(); + try { + if (curTime.before(DateTimeUtils.addSecond(instance.getScheduleTime(), 300))) { + continue; + } + + if (StringUtils.isNullOrEmpty(applicationId)) { + if (curTime.after(DateTimeUtils.addSecond(instance.getScheduleTime(), submitJobTimeoutSec))) { + LOG.warn("Submit job timeout, should kill " + instance + ", curTime: " + curTime); + instance.setStatus(InstanceStatus.FAILED.getStatus()); + metadataStore.updateStatus(instance); + } + continue; + } + + ApplicationReport applicationReport = yarnClient.getApplicationReport(ApplicationId.fromString(applicationId)); + if (isFinished(applicationReport.getYarnApplicationState())) { + LOG.info("Job has done " + applicationReport.getYarnApplicationState() + ", id: " + applicationId + ", instance " + instance); + if (applicationReport.getYarnApplicationState() == YarnApplicationState.FINISHED) { + instance.setStatus(InstanceStatus.COMPLETED.getStatus()); + } else { + instance.setStatus(InstanceStatus.FAILED.getStatus()); + } + metadataStore.updateStatus(instance); + } else { + LOG.info("Job is running: " + applicationId + ", instance: " + instance.getRecordKey()); + } + } catch (Exception e) { + LOG.error("Fail restore Job: " + instance.getInstanceRunStatus(), e); + } + } + } + + private boolean isFinished(YarnApplicationState state) { + switch (state) { + case FINISHED: + case FAILED: + case KILLED: + return true; + } + + return false; + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/RetryService.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/RetryService.java new file mode 100644 index 0000000000000..91cd22e2f2537 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/RetryService.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.service; + +import org.apache.hudi.table.service.manager.entity.Instance; +import org.apache.hudi.table.service.manager.entity.InstanceStatus; +import org.apache.hudi.table.service.manager.store.MetadataStore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class RetryService implements BaseService { + + private static final Logger LOG = LogManager.getLogger(RetryService.class); + + private MetadataStore metadataStore; + private ScheduledExecutorService service; + + public RetryService(MetadataStore metadataStore) { + this.metadataStore = metadataStore; + } + + @Override + public void init() { + LOG.info("Init service: " + RetryService.class.getName()); + //ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("Retry-Service-%d").build(); + this.service = Executors.newSingleThreadScheduledExecutor(); + } + + @Override + public void startService() { + LOG.info("Start service: " + RetryService.class.getName()); + service.scheduleAtFixedRate(new RetryRunnable(), 30, 180, TimeUnit.SECONDS); + } + + @Override + public void stop() { + LOG.info("Stop service: " + RetryService.class.getName()); + if (service != null && !service.isShutdown()) { + service.shutdown(); + } + } + + private class RetryRunnable implements Runnable { + + @Override + public void run() { + submitFailTask(); + } + } + + public void submitFailTask() { + List failInstances = metadataStore.getRetryInstances(); + for (Instance instance : failInstances) { + LOG.info("Start retry instance: " + instance.getIdentifier()); + instance.setStatus(InstanceStatus.SCHEDULED.getStatus()); + metadataStore.updateStatus(instance); + } + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/ScheduleService.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/ScheduleService.java new file mode 100644 index 0000000000000..847fabf9f86b8 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/service/ScheduleService.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.service; + +import org.apache.hudi.table.service.manager.common.ServiceConfig; +import org.apache.hudi.table.service.manager.entity.Action; +import org.apache.hudi.table.service.manager.entity.Instance; +import org.apache.hudi.table.service.manager.entity.InstanceStatus; +import org.apache.hudi.table.service.manager.exception.HoodieTableServiceManagerException; +import org.apache.hudi.table.service.manager.executor.BaseActionExecutor; +import org.apache.hudi.table.service.manager.executor.CompactionExecutor; +import org.apache.hudi.table.service.manager.store.MetadataStore; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class ScheduleService implements BaseService { + + private static final Logger LOG = LogManager.getLogger(ScheduleService.class); + + private ScheduledExecutorService service; + private final ExecutorService executionService; + private final MetadataStore metadataStore; + private final long scheduleIntervalMs; + + public ScheduleService(ExecutorService executionService, + MetadataStore metadataStore) { + this.executionService = executionService; + this.metadataStore = metadataStore; + this.scheduleIntervalMs = metadataStore.getTableServiceManagerConfig().getScheduleIntervalMs(); + } + + @Override + public void init() { + LOG.info("Finish init schedule service, scheduleIntervalMs: " + scheduleIntervalMs); + this.service = Executors.newSingleThreadScheduledExecutor(); + } + + @Override + public void startService() { + LOG.info("Start service: " + ScheduleService.class.getName()); + service.scheduleAtFixedRate(new ScheduleRunnable(), 30, 60, TimeUnit.SECONDS); + } + + @Override + public void stop() { + LOG.info("Stop service: " + ScheduleService.class.getName()); + if (service != null && !service.isShutdown()) { + service.shutdown(); + } + } + + private class ScheduleRunnable implements Runnable { + + @Override + public void run() { + submitReadyTask(); + } + } + + public void submitReadyTask() { + int limitSize = executionService.getFreeSize(); + LOG.info("Start get ready instances, limitSize: " + limitSize); + if (limitSize > 0) { + List readyInstances = metadataStore.getInstances( + InstanceStatus.SCHEDULED.getStatus(), limitSize); + for (Instance readyInstance : readyInstances) { + if (waitSchedule(readyInstance)) { + LOG.info("Instance should wait schedule: " + readyInstance.getInstanceRunStatus()); + continue; + } + LOG.info("Schedule ready instances: " + readyInstance.getInstanceRunStatus()); + BaseActionExecutor executor = getActionExecutor(readyInstance); + executionService.submitTask(executor); + } + } + } + + private boolean waitSchedule(Instance instance) { + return instance.getAction() == Action.COMPACTION.getValue() + && instance.getUpdateTime().getTime() + scheduleIntervalMs + > System.currentTimeMillis(); + } + + protected BaseActionExecutor getActionExecutor(Instance instance) { + if (instance.getAction() == Action.COMPACTION.getValue()) { + return new CompactionExecutor(instance, metadataStore.getTableServiceManagerConfig()); + } else { + throw new HoodieTableServiceManagerException("Unsupported action " + instance.getAction()); + } + } + +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/MetadataStore.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/MetadataStore.java new file mode 100644 index 0000000000000..1f15dff4bd069 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/MetadataStore.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.store; + +import org.apache.hudi.table.service.manager.common.HoodieTableServiceManagerConfig; +import org.apache.hudi.table.service.manager.entity.Instance; + +import java.util.List; + +public interface MetadataStore { + + void saveInstance(Instance instance); + + void updateStatus(Instance instance); + + void init(); + + Instance getInstance(Instance instance); + + List getInstances(int status, int limit); + + List getRetryInstances(); + + HoodieTableServiceManagerConfig getTableServiceManagerConfig(); +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/impl/InstanceService.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/impl/InstanceService.java new file mode 100644 index 0000000000000..f8aded9a67e96 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/impl/InstanceService.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.store.impl; + +import org.apache.hudi.table.service.manager.common.ServiceContext; +import org.apache.hudi.table.service.manager.entity.AssistQueryEntity; +import org.apache.hudi.table.service.manager.entity.Instance; +import org.apache.hudi.table.service.manager.entity.InstanceStatus; + +import org.apache.hudi.table.service.manager.store.jdbc.JdbcMapper; + +import org.apache.ibatis.session.RowBounds; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class InstanceService { + + private static Logger LOG = LogManager.getLogger(InstanceService.class); + + private JdbcMapper jdbcMapper = ServiceContext.getJdbcMapper(); + + private static final String NAMESPACE = "Instance"; + + public void createInstance() { + try { + jdbcMapper.updateObject(statement(NAMESPACE, "createInstance"), null); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void saveInstance(Instance instance) { + try { + jdbcMapper.saveObject(statement(NAMESPACE, "saveInstance"), instance); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void updateStatus(Instance instance) { + try { + int ret = jdbcMapper.updateObject(statement(NAMESPACE, getUpdateStatusSqlId(instance)), instance); + if (ret != 1) { + LOG.error("Fail update status instance: " + instance); + throw new RuntimeException("Fail update status instance: " + instance.getIdentifier()); + } + LOG.info("Success update status instance: " + instance.getIdentifier()); + } catch (Exception e) { + LOG.error("Fail update status, instance: " + instance.getIdentifier() + ", errMsg: ", e); + throw new RuntimeException(e); + } + } + + public void updateExecutionInfo(Instance instance) { + int retryNum = 0; + try { + while (retryNum++ < 3) { + int ret = jdbcMapper.updateObject(statement(NAMESPACE, "updateExecutionInfo"), instance); + if (ret != 1) { + LOG.warn("Fail update execution info instance: " + instance); + TimeUnit.SECONDS.sleep(5); + } else { + LOG.info("Success update execution info, instance: " + instance.getIdentifier()); + return; + } + } + throw new RuntimeException("Fail update execution info: " + instance.getIdentifier()); + } catch (Exception e) { + LOG.error("Fail update status, instance: " + instance.getIdentifier() + ", errMsg: ", e); + throw new RuntimeException(e); + } + } + + public Instance getInstance(Instance instance) { + try { + return jdbcMapper.getObject(statement(NAMESPACE, "getInstance"), instance); + } catch (Exception e) { + LOG.error("Fail get Instance: " + instance.getIdentifier() + ", errMsg: ", e); + throw new RuntimeException(e); + } + } + + private String getUpdateStatusSqlId(Instance instance) { + switch (InstanceStatus.getInstance(instance.getStatus())) { + case SCHEDULED: + return "retryInstance"; + case RUNNING: + return "runningInstance"; + case COMPLETED: + return "successInstance"; + case FAILED: + return "failInstance"; + case INVALID: + return "invalidInstance"; + default: + throw new RuntimeException("Invalid instance: " + instance.getIdentifier()); + } + } + + public List getInstances(int status, int limit) { + try { + if (limit > 0) { + return jdbcMapper.getObjects(statement(NAMESPACE, "getInstances"), status, + new RowBounds(0, limit)); + } else { + return jdbcMapper.getObjects(statement(NAMESPACE, "getInstances"), status); + } + } catch (Exception e) { + LOG.error("Fail get instances, status: " + status + ", errMsg: ", e); + throw new RuntimeException("Fail get instances, status: " + status); + } + } + + public List getRetryInstances(int maxRetry) { + try { + return jdbcMapper.getObjects(statement(NAMESPACE, "getRetryInstances"), + new AssistQueryEntity(maxRetry)); + } catch (Exception e) { + LOG.error("Fail get retry instances, errMsg: ", e); + throw new RuntimeException("Fail get retry instances"); + } + } + + public List getInstanceAfterTime(AssistQueryEntity queryEntity) { + try { + return jdbcMapper.getObjects(statement(NAMESPACE, "getInstanceAfterTime"), queryEntity); + } catch (Exception e) { + LOG.error("Fail get instances after time, errMsg: ", e); + throw new RuntimeException("Fail get alert instances"); + } + } + + private String statement(String namespace, String sqlID) { + return namespace + "." + sqlID; + } +} \ No newline at end of file diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/impl/RelationDBBasedStore.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/impl/RelationDBBasedStore.java new file mode 100644 index 0000000000000..eda2ac15c102a --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/impl/RelationDBBasedStore.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.store.impl; + +import org.apache.hudi.table.service.manager.common.HoodieTableServiceManagerConfig; +import org.apache.hudi.table.service.manager.common.ServiceContext; +import org.apache.hudi.table.service.manager.entity.Instance; +import org.apache.hudi.table.service.manager.store.MetadataStore; + +import java.util.List; + +public class RelationDBBasedStore implements MetadataStore { + + private final InstanceService instanceDao; + private final HoodieTableServiceManagerConfig config; + + public RelationDBBasedStore(HoodieTableServiceManagerConfig config) { + this.config = config; + this.instanceDao = ServiceContext.getInstanceDao(); + } + + public HoodieTableServiceManagerConfig getTableServiceManagerConfig() { + return config; + } + + @Override + public void saveInstance(Instance instance) { + instanceDao.saveInstance(instance); + } + + @Override + public void updateStatus(Instance instance) { + instanceDao.updateStatus(instance); + } + + @Override + public void init() { + instanceDao.createInstance(); + } + + @Override + public Instance getInstance(Instance instance) { + return instanceDao.getInstance(instance); + } + + @Override + public List getInstances(int status, int limit) { + return instanceDao.getInstances(status, limit); + } + + @Override + public List getRetryInstances() { + return instanceDao.getRetryInstances(config.getInstanceMaxRetryNum()); + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/jdbc/HikariDataSourceFactory.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/jdbc/HikariDataSourceFactory.java new file mode 100644 index 0000000000000..c1ef06bc53fca --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/jdbc/HikariDataSourceFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.store.jdbc; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.ibatis.datasource.unpooled.UnpooledDataSourceFactory; +import org.apache.ibatis.io.Resources; + +import java.io.IOException; +import java.util.Properties; + +public class HikariDataSourceFactory extends UnpooledDataSourceFactory { + private static final String PROPERTIES_PATH = "hikariPool.properties"; + + public HikariDataSourceFactory() throws IOException { + Properties properties = new Properties(); + properties.load(Resources.getResourceAsStream(PROPERTIES_PATH)); + HikariConfig config = new HikariConfig(properties); + this.dataSource = new HikariDataSource(config); + } +} \ No newline at end of file diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/jdbc/JdbcMapper.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/jdbc/JdbcMapper.java new file mode 100644 index 0000000000000..893480e9bc423 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/jdbc/JdbcMapper.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.store.jdbc; + +import java.util.List; +import org.apache.ibatis.session.RowBounds; +import org.apache.ibatis.session.SqlSession; + +public class JdbcMapper { + + public int saveObject(String sqlId, Object object) { + try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { + int res = sqlSession.insert(sqlId, object); + sqlSession.commit(); + return res; + } + } + + public int updateObject(String sqlId, Object params) { + try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { + int res = sqlSession.update(sqlId, params); + sqlSession.commit(); + return res; + } + } + + public T getObject(String sqlId, Object params) { + try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { + return sqlSession.selectOne(sqlId, params); + } + } + + public List getObjects(String sqlId, Object params) { + try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { + return sqlSession.selectList(sqlId, params); + } + } + + public List getObjects(String sqlId, Object params, RowBounds rowBounds) { + try (SqlSession sqlSession = SqlSessionFactoryUtil.openSqlSession()) { + return sqlSession.selectList(sqlId, params, rowBounds); + } + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/jdbc/SqlSessionFactoryUtil.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/jdbc/SqlSessionFactoryUtil.java new file mode 100644 index 0000000000000..06d0d9b84a1a7 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/store/jdbc/SqlSessionFactoryUtil.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.store.jdbc; + +import org.apache.hudi.table.service.manager.exception.HoodieTableServiceManagerException; + +import org.apache.ibatis.io.Resources; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.SqlSessionFactory; +import org.apache.ibatis.session.SqlSessionFactoryBuilder; + +import java.io.IOException; +import java.io.InputStream; +import java.sql.PreparedStatement; +import java.util.stream.Collectors; + +public class SqlSessionFactoryUtil { + + private static final String CONFIG_PATH = "mybatis-config.xml"; + + private static SqlSessionFactory sqlSessionFactory; + private static final Class CLASS_LOCK = SqlSessionFactoryUtil.class; + + private SqlSessionFactoryUtil() { + + } + + public static void initSqlSessionFactory() { + try (InputStream inputStream = Resources.getResourceAsStream(CONFIG_PATH)) { + synchronized (CLASS_LOCK) { + if (sqlSessionFactory == null) { + sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static SqlSession openSqlSession() { + if (sqlSessionFactory == null) { + initSqlSessionFactory(); + init(); + } + return sqlSessionFactory.openSession(); + } + + public static void init() { + try { + String[] ddls = org.apache.commons.io.IOUtils.readLines( + SqlSessionFactoryUtil.class.getResourceAsStream("/table-service-manager.sql")) + .stream().filter(e -> !e.startsWith("--")) + .collect(Collectors.joining("")) + .split(";"); + for (String ddl : ddls) { + try (PreparedStatement statement = SqlSessionFactoryUtil.openSqlSession().getConnection() + .prepareStatement(ddl)) { + statement.execute(); + } + } + } catch (Exception e) { + throw new HoodieTableServiceManagerException("Unable to read init ddl file", e); + } + } + +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/util/DateTimeUtils.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/util/DateTimeUtils.java new file mode 100644 index 0000000000000..a99788a46c648 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/util/DateTimeUtils.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.util; + +import java.util.Calendar; +import java.util.Date; + +public class DateTimeUtils { + + public static Date addDay(int amount) { + Calendar c = Calendar.getInstance(); + c.setTime(new Date()); + c.add(Calendar.DATE, amount); + return c.getTime(); + } + + public static Date addSecond(Date date, int amount) { + Calendar c = Calendar.getInstance(); + c.setTime(date); + c.add(Calendar.SECOND, amount); + return c.getTime(); + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/util/InstanceUtil.java b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/util/InstanceUtil.java new file mode 100644 index 0000000000000..0d1a06a180280 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/java/org/apache/hudi/table/service/manager/util/InstanceUtil.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.service.manager.util; + +import org.apache.hudi.table.service.manager.entity.Action; +import org.apache.hudi.table.service.manager.entity.Engine; +import org.apache.hudi.table.service.manager.entity.Instance; + +public class InstanceUtil { + + public static void checkArgument(Instance instance) { + if (instance.getExecutionEngine() == null) { + instance.setExecutionEngine(Engine.SPARK); + } + Engine.checkEngineType(instance); + Action.checkActionType(instance); + } +} diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/resources/hikariPool.properties b/hudi-platform-service/hudi-table-service-manager/src/main/resources/hikariPool.properties new file mode 100644 index 0000000000000..22294d2617a18 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/resources/hikariPool.properties @@ -0,0 +1,21 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +#jdbcUrl=jdbc:h2:mem:tms;MODE=MYSQL +jdbcUrl=jdbc:mysql://localhost:3306/tms?user=root&password=12345678&useUnicode=true&characterEncoding=utf-8&auth_enable=true&useServerPrepStmts=false&rewriteBatchedStatements=true&useAffectedRows=true +#dataSource.user=root +#dataSource.password=password \ No newline at end of file diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/resources/mybatis-config.xml b/hudi-platform-service/hudi-table-service-manager/src/main/resources/mybatis-config.xml new file mode 100644 index 0000000000000..c277f23864443 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/resources/mybatis-config.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/resources/mybatis/Instance.xml b/hudi-platform-service/hudi-table-service-manager/src/main/resources/mybatis/Instance.xml new file mode 100644 index 0000000000000..44e66e46b65ce --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/resources/mybatis/Instance.xml @@ -0,0 +1,178 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + CREATE TABLE if not exists `instance` + ( + `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'primary key', + `db_name` varchar(128) NOT NULL COMMENT 'db name', + `table_name` varchar(128) NOT NULL COMMENT 'table name', + `base_path` varchar(128) NOT NULL COMMENT 'base path', + `execution_engine` varchar(128) NOT NULL COMMENT 'execution engine', + `user_name` varchar(128) NOT NULL COMMENT 'user_name', + `queue` varchar(128) NOT NULL COMMENT 'queue', + `resource` varchar(128) NOT NULL COMMENT 'resource', + `parallelism` varchar(128) NOT NULL COMMENT 'parallelism', + `auto_clean` int NOT NULL DEFAULT '0' COMMENT 'auto_clean', + `instant` varchar(128) NOT NULL COMMENT 'instant', + `action` int NOT NULL COMMENT 'action', + `status` int NOT NULL COMMENT 'status', + `run_times` int NOT NULL DEFAULT '0' COMMENT 'run times', + `application_id` varchar(128) DEFAULT NULL COMMENT 'application id', + `schedule_time` timestamp NULL DEFAULT NULL COMMENT 'schedule time', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_table_instant` (`db_name`,`table_name`,`instant`), + KEY `idx_status` (`status`), + KEY `idx_update_time_status` (`update_time`,`status`) + ) COMMENT='Table Management Service instance'; + + + + id, db_name, table_name, base_path, execution_engine, user_name, queue, resource, parallelism, + instant, action, status, run_times, application_id, schedule_time, create_time, update_time + + + + INSERT INTO instance (db_name, table_name, base_path, execution_engine, user_name, + queue, resource, parallelism, instant, action, status, run_times) + VALUES (#{dbName}, #{tableName}, #{basePath}, #{executionEngine}, #{userName}, + #{queue},#{resource}, #{parallelism}, #{instant}, #{action}, #{status}, 0) + + + + UPDATE instance + SET status = #{status}, + schedule_time = now(), + run_times = run_times + 1 + WHERE db_name = #{dbName} + and table_name = #{tableName} + and instant = #{instant} + and status = 0 + + + + UPDATE instance + SET status = #{status} + WHERE db_name = #{dbName} + and table_name = #{tableName} + and instant = #{instant} + + + + UPDATE instance + SET application_id = #{applicationId} + WHERE db_name = #{dbName} + and table_name = #{tableName} + and instant = #{instant} + + + + UPDATE instance + SET status = #{status} + WHERE db_name = #{dbName} + and table_name = #{tableName} + and instant = #{instant} + and status = 1 + + + + UPDATE instance + SET status = #{status} + WHERE db_name = #{dbName} + and table_name = #{tableName} + and instant = #{instant} + and status = 1 + + + + UPDATE instance + SET status = #{status} + WHERE db_name = #{dbName} + and table_name = #{tableName} + and instant = #{instant} + + + + + + + + + + + diff --git a/hudi-platform-service/hudi-table-service-manager/src/main/resources/table-service-manager.sql b/hudi-platform-service/hudi-table-service-manager/src/main/resources/table-service-manager.sql new file mode 100644 index 0000000000000..b55a71c533485 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/main/resources/table-service-manager.sql @@ -0,0 +1,44 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- + +CREATE TABLE if not exists `instance` +( + `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT 'primary key', + `db_name` varchar(128) NOT NULL COMMENT 'db name', + `table_name` varchar(128) NOT NULL COMMENT 'table name', + `base_path` varchar(128) NOT NULL COMMENT 'base path', + `execution_engine` varchar(128) NOT NULL COMMENT 'execution engine', + `user_name` varchar(128) NOT NULL COMMENT 'user_name', + `queue` varchar(128) NOT NULL COMMENT 'queue', + `resource` varchar(128) NOT NULL COMMENT 'resource', + `parallelism` varchar(128) NOT NULL COMMENT 'parallelism', + `auto_clean` int NOT NULL DEFAULT '0' COMMENT 'auto_clean', + `instant` varchar(128) NOT NULL COMMENT 'instant', + `action` int NOT NULL COMMENT 'action', + `status` int NOT NULL COMMENT 'status', + `run_times` int NOT NULL DEFAULT '0' COMMENT 'run times', + `application_id` varchar(128) DEFAULT NULL COMMENT 'application id', + `schedule_time` timestamp NULL DEFAULT NULL COMMENT 'schedule time', + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time', + PRIMARY KEY (`id`), + UNIQUE KEY `uniq_table_instant` (`db_name`,`table_name`,`instant`), + KEY `idx_status` (`status`), + KEY `idx_update_time_status` (`update_time`,`status`) +) COMMENT='Table Management Service instance'; + diff --git a/hudi-platform-service/hudi-table-service-manager/src/test/resources/log4j-surefire-quiet.properties b/hudi-platform-service/hudi-table-service-manager/src/test/resources/log4j-surefire-quiet.properties new file mode 100644 index 0000000000000..b21b5d4070c41 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/test/resources/log4j-surefire-quiet.properties @@ -0,0 +1,29 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=WARN, CONSOLE +log4j.logger.org.apache.hudi=DEBUG + +# CONSOLE is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# CONSOLE uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL diff --git a/hudi-platform-service/hudi-table-service-manager/src/test/resources/log4j-surefire.properties b/hudi-platform-service/hudi-table-service-manager/src/test/resources/log4j-surefire.properties new file mode 100644 index 0000000000000..c03e808cca1f8 --- /dev/null +++ b/hudi-platform-service/hudi-table-service-manager/src/test/resources/log4j-surefire.properties @@ -0,0 +1,30 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=WARN, CONSOLE +log4j.logger.org.apache=INFO +log4j.logger.org.apache.hudi=DEBUG + +# A1 is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL \ No newline at end of file diff --git a/hudi-platform-service/pom.xml b/hudi-platform-service/pom.xml index 7c0234a7c5f65..e6c2a597d8363 100644 --- a/hudi-platform-service/pom.xml +++ b/hudi-platform-service/pom.xml @@ -32,5 +32,6 @@ hudi-metaserver + hudi-table-service-manager