Skip to content

Commit 4153dd2

Browse files
committed
Worker
1 parent 7a81176 commit 4153dd2

File tree

65 files changed

+1758
-183
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+1758
-183
lines changed

allo-common/pom.xml

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<project xmlns="http://maven.apache.org/POM/4.0.0"
20+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
21+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22+
<parent>
23+
<artifactId>allo</artifactId>
24+
<groupId>io.github.artiship</groupId>
25+
<version>1.0-SNAPSHOT</version>
26+
</parent>
27+
<modelVersion>4.0.0</modelVersion>
28+
29+
<artifactId>allo-common</artifactId>
30+
31+
<properties>
32+
<maven.compiler.source>8</maven.compiler.source>
33+
<maven.compiler.target>8</maven.compiler.target>
34+
</properties>
35+
36+
<dependencies>
37+
<dependency>
38+
<groupId>io.github.artiship</groupId>
39+
<artifactId>allo-common</artifactId>
40+
</dependency>
41+
</dependencies>
42+
43+
</project>

allo-scheduler/src/main/java/io/github/artiship/allo/scheduler/core/Service.java allo-common/src/main/java/io/github/artiship/allo/common/Service.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package io.github.artiship.allo.scheduler.core;
18+
package io.github.artiship.allo.common;
1919

2020
public interface Service {
2121
void start() throws Exception;

allo-model/src/main/java/io/github/artiship/allo/model/utils/TimeUtils.java allo-common/src/main/java/io/github/artiship/allo/common/TimeUtils.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package io.github.artiship.allo.model.utils;
18+
package io.github.artiship.allo.common;
1919

2020
import java.time.Instant;
2121
import java.time.LocalDateTime;

allo-database/src/main/java/io/github/artiship/allo/database/entity/Job.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class Job implements Persistable {
4545
private String scheduleCron;
4646
private Integer isSelfDependent;
4747
private Integer isSkipRun;
48-
private String ossPath;
48+
private String jobStoragePath;
4949
private Integer maxRetryTimes;
5050
private Long retryInterval;
5151
private String workerGroups;
@@ -81,7 +81,7 @@ public JobBo toJobBo() {
8181
.setRetryInterval(getRetryInterval())
8282
.setWorkerGroups(getListOfWorkerGroups())
8383
.setDescription(getDescription())
84-
.setOssPath(getOssPath());
84+
.setJobStoragePath(getJobStoragePath());
8585
}
8686

8787
public Boolean getIsSelfDependentBoolean() {
@@ -113,7 +113,7 @@ public Job updateIgnoreNull(Job schedulerJob) {
113113
if (schedulerJob.getWorkerGroups() != null) this.workerGroups = schedulerJob.getWorkerGroups();
114114
if (schedulerJob.getJobState() != null) this.jobState = schedulerJob.getJobState();
115115
if (schedulerJob.getDescription() != null) this.description = schedulerJob.getDescription();
116-
if (schedulerJob.getOssPath() != null) this.ossPath = schedulerJob.getOssPath();
116+
if (schedulerJob.getJobStoragePath() != null) this.jobStoragePath = schedulerJob.getJobStoragePath();
117117
if (schedulerJob.getCreateTime() != null) this.createTime = schedulerJob.getCreateTime();
118118
if (schedulerJob.getUpdateTime() != null) this.updateTime = schedulerJob.getUpdateTime();
119119
this.updateTime = now();

allo-database/src/main/java/io/github/artiship/allo/database/entity/Task.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class Task {
3838
private Long jobId;
3939
private String taskName;
4040
private Integer taskState;
41-
private String ossPath;
41+
private String jobStoragePath;
4242
private String workerGroups;
4343
private String workerHost;
4444
private Integer workerPort;
@@ -71,7 +71,7 @@ public Task updateIgnoreNull(Task task) {
7171
if (task.getJobId() != null) this.jobId = task.getJobId();
7272
if (task.getTaskName() != null) this.taskName = task.getTaskName();
7373
if (task.getTaskState() != null) this.taskState = task.getTaskState();
74-
if (task.getOssPath() != null) this.ossPath = task.getOssPath();
74+
if (task.getJobStoragePath() != null) this.jobStoragePath = task.getJobStoragePath();
7575
if (task.getWorkerGroups() != null) this.workerGroups = task.getWorkerGroups();
7676
if (task.getWorkerHost() != null) this.workerHost = task.getWorkerHost();
7777
if (task.getWorkerPort() != null) this.workerPort = task.getWorkerPort();

allo-ha/pom.xml

+10
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,15 @@
5454
<groupId>io.github.artiship</groupId>
5555
<artifactId>allo-model</artifactId>
5656
</dependency>
57+
58+
<dependency>
59+
<groupId>io.github.artiship</groupId>
60+
<artifactId>allo-common</artifactId>
61+
</dependency>
62+
63+
<dependency>
64+
<groupId>org.springframework.boot</groupId>
65+
<artifactId>spring-boot-starter-web</artifactId>
66+
</dependency>
5767
</dependencies>
5868
</project>

allo-ha/src/main/java/io/github.com/artiship/ha/GlobalConstants.java

+2
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,6 @@ public class GlobalConstants {
2222
public static final String SCHEDULER_GROUP = "/allo/scheduler";
2323
public static final String DEAD_WORKER_GROUP = "/allo/dead/workers";
2424
public static final String LOST_TASK_GROUP = "/allo/lost/tasks";
25+
public static final String ZK_PATH_SEPARATOR = "/";
26+
public static final String START_SHELL_FILE_NAME = "start.sh";
2527
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.github.com.artiship.ha;
19+
20+
import org.apache.curator.framework.CuratorFramework;
21+
import org.apache.curator.retry.ExponentialBackoffRetry;
22+
import org.springframework.beans.factory.annotation.Autowired;
23+
import org.springframework.beans.factory.annotation.Qualifier;
24+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
25+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
26+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
30+
import static org.apache.curator.framework.CuratorFrameworkFactory.newClient;
31+
32+
@Configuration
33+
@ConditionalOnClass(value = {SchedulerLeaderRetrieval.class, CuratorFramework.class})
34+
@EnableConfigurationProperties(ZkConfig.class)
35+
public class HaAutoConfigure {
36+
@Autowired private ZkConfig zkConfig;
37+
38+
@Bean
39+
@ConditionalOnMissingBean
40+
public CuratorFramework zkClient() {
41+
final CuratorFramework client =
42+
newClient(
43+
zkConfig.getZkUrl(),
44+
zkConfig.getSessionTimeoutMs(),
45+
zkConfig.getConnectionTimeoutMs(),
46+
new ExponentialBackoffRetry(
47+
zkConfig.getRetryWait(), zkConfig.getReconnectAttempts()));
48+
client.start();
49+
return client;
50+
}
51+
52+
@Bean
53+
@ConditionalOnMissingBean
54+
public SchedulerLeaderRetrieval schedulerLeaderListener(
55+
@Qualifier("zkClient") CuratorFramework zkClient) {
56+
return new SchedulerLeaderRetrieval(zkClient);
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.github.com.artiship.ha;
19+
20+
import io.github.artiship.allo.common.Service;
21+
import io.github.artiship.allo.model.ha.ZkScheduler;
22+
import io.github.com.artiship.ha.utils.CuratorUtils;
23+
import lombok.extern.slf4j.Slf4j;
24+
import org.apache.curator.framework.CuratorFramework;
25+
import org.apache.curator.framework.recipes.cache.NodeCache;
26+
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
27+
28+
import static io.github.com.artiship.ha.GlobalConstants.SCHEDULER_GROUP;
29+
import static java.nio.charset.StandardCharsets.UTF_8;
30+
import static org.apache.curator.framework.imps.CuratorFrameworkState.STARTED;
31+
32+
@Slf4j
33+
public class SchedulerLeaderRetrieval implements NodeCacheListener, Service {
34+
private final CuratorFramework zkClient;
35+
36+
private NodeCache masterCache;
37+
private ZkScheduler leader;
38+
39+
public SchedulerLeaderRetrieval(CuratorFramework zkClient) {
40+
this.zkClient = zkClient;
41+
}
42+
43+
@Override
44+
public void start() throws Exception {
45+
try {
46+
masterCache =
47+
new NodeCache(zkClient, CuratorUtils.createPath(zkClient, SCHEDULER_GROUP));
48+
masterCache.start(true);
49+
masterCache.getListenable().addListener(this);
50+
51+
retrieve();
52+
} catch (Exception e) {
53+
log.info("Start cache zk master node {} fail", SCHEDULER_GROUP, e);
54+
}
55+
}
56+
57+
@Override
58+
public void nodeChanged() {
59+
retrieve();
60+
}
61+
62+
private void retrieve() {
63+
leader = ZkScheduler.from(new String(masterCache.getCurrentData().getData(), UTF_8));
64+
}
65+
66+
public ZkScheduler getLeader() {
67+
return this.leader;
68+
}
69+
70+
public String getMasterHttpUrl() {
71+
return new StringBuffer("http://")
72+
.append(leader.getIp())
73+
.append(":")
74+
.append(leader.getHttpPort())
75+
.toString();
76+
}
77+
78+
@Override
79+
public void stop() throws Exception {
80+
if (zkClient.getState() == STARTED) {
81+
zkClient.close();
82+
}
83+
}
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package io.github.com.artiship.ha;
19+
20+
import io.github.artiship.allo.common.Service;
21+
import lombok.extern.slf4j.Slf4j;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.concurrent.CountDownLatch;
26+
27+
import static com.google.common.collect.Lists.reverse;
28+
import static java.lang.Runtime.getRuntime;
29+
30+
@Slf4j
31+
public abstract class ServiceManager {
32+
private List<Service> services = new ArrayList<>();
33+
private CountDownLatch isStop = new CountDownLatch(1);
34+
35+
public void registerAll(Service... list) {
36+
for (Service service : list) {
37+
services.add(service);
38+
}
39+
}
40+
41+
protected void start() {
42+
services.forEach(
43+
service -> {
44+
services.add(service);
45+
try {
46+
service.start();
47+
} catch (Exception e) {
48+
log.error("Service Manager start fail", e);
49+
stop();
50+
}
51+
});
52+
getRuntime().addShutdownHook(new Thread(() -> stop()));
53+
54+
blockUntilShutdown();
55+
}
56+
57+
private void blockUntilShutdown() {
58+
try {
59+
isStop.await();
60+
} catch (InterruptedException e) {
61+
log.error("Service Manager is stop latch await was terminated", e);
62+
}
63+
}
64+
65+
protected void stop() {
66+
reverse(services)
67+
.forEach(
68+
service -> {
69+
try {
70+
service.stop();
71+
} catch (Exception e) {
72+
log.error("Service Manager stop fail", e);
73+
}
74+
});
75+
76+
isStop.countDown();
77+
}
78+
}

allo-scheduler/src/main/java/io/github/artiship/allo/scheduler/core/TaskStateListener.java allo-ha/src/main/java/io/github.com/artiship/ha/TaskStateListener.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package io.github.artiship.allo.scheduler.core;
18+
package io.github.com.artiship.ha;
1919

2020
import io.github.artiship.allo.model.bo.TaskBo;
2121

allo-scheduler/src/main/java/io/github/artiship/allo/scheduler/core/TaskStateListenerAdaptor.java allo-ha/src/main/java/io/github.com/artiship/ha/TaskStateListenerAdaptor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18-
package io.github.artiship.allo.scheduler.core;
18+
package io.github.com.artiship.ha;
1919

2020
import io.github.artiship.allo.model.bo.TaskBo;
2121

0 commit comments

Comments
 (0)