Skip to content

Commit

Permalink
Provides the ability to obtain and manage orchestrator instances. #356
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyou2 authored and yangzhiyue committed Aug 20, 2021
1 parent ab05f32 commit 60aba15
Show file tree
Hide file tree
Showing 8 changed files with 473 additions and 0 deletions.
71 changes: 71 additions & 0 deletions dss-orchestrator/dss-orchestrator-loader/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2019 WeBank
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>dss</artifactId>
<groupId>com.webank.wedatasphere.dss</groupId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>dss-orchestrator-loader</artifactId>
<dependencies>
<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
<version>1.3.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.webank.wedatasphere.dss</groupId>
<artifactId>dss-development-process-standard</artifactId>
<version>${dss.version}</version>

</dependency>
<dependency>
<groupId>com.webank.wedatasphere.dss</groupId>
<artifactId>dss-orchestrator-core</artifactId>
<version>${dss.version}</version>
</dependency>
<dependency>
<groupId>com.webank.wedatasphere.dss</groupId>
<artifactId>dss-orchestrator-db</artifactId>
<version>${dss.version}</version>
</dependency>
<dependency>
<groupId>com.webank.wedatasphere.dss</groupId>
<artifactId>dss-common</artifactId>
<version>${dss.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.webank.wedatasphere.dss</groupId>
<artifactId>dss-appconn-manager-core</artifactId>
<version>${dss.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2019 WeBank
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.webank.wedatasphere.dss.orchestrator.loader;

import com.webank.wedatasphere.dss.appconn.core.AppConn;
import com.webank.wedatasphere.dss.appconn.manager.AppConnManager;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;



@Component
class DefaultLinkedAppConnResolver implements LinkedAppConnResolver {


private static final Logger LOGGER = LoggerFactory.getLogger(DefaultLinkedAppConnResolver.class);


static{
LOGGER.info("component resolver inited");
}

@Override
public List<AppConn> resolveAppConnByUser(String userName, String workspaceName, String typeName) {
//todo 后面可以使用数据库表来定义用户可以加载的AppConn.
List<AppConn> appConns = new ArrayList<>();
for(AppConn appConn : AppConnManager.getAppConnManager().listAppConns()){
//可以在这里根据用户情况和工作空间情况,限制appConn的加载
appConns.add(appConn);
}

return appConns;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2019 WeBank
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.webank.wedatasphere.dss.orchestrator.loader;

import com.webank.wedatasphere.dss.appconn.core.AppConn;
import com.webank.wedatasphere.dss.appconn.core.exception.AppConnErrorException;
import com.webank.wedatasphere.dss.appconn.core.ext.OnlyDevelopmentAppConn;
import com.webank.wedatasphere.dss.appconn.manager.AppConnManager;
import com.webank.wedatasphere.dss.common.label.DSSLabel;
import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator;
import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestratorContext;
import com.webank.wedatasphere.dss.orchestrator.core.impl.DefaultOrchestrator;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class DefaultOrchestratorLoader implements OrchestratorLoader {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultOrchestratorLoader.class);
@Autowired
private DSSOrchestratorContext dssOrchestratorContext;

@Autowired
private LinkedAppConnResolver linkedAppConnResolver;


@Override
public DSSOrchestrator loadOrchestrator(String userName,
String workspaceName,
String typeName,
String appConnName,
List<DSSLabel> dssLabels) throws AppConnErrorException {

//todo load DSSOrchestatror by type name
DSSOrchestrator dssOrchestrator = new DefaultOrchestrator() {
@Override
protected DSSOrchestratorContext createOrchestratorContext() {
return dssOrchestratorContext;
}
};

//向工作流添加实现了第三级规范的AppConn
List<AppConn> appConnList = linkedAppConnResolver.resolveAppConnByUser(userName, workspaceName, typeName);
for (AppConn appConn : appConnList) {
if(appConn instanceof OnlyDevelopmentAppConn){
dssOrchestrator.addLinkedAppConn(appConn);
}

}
LOGGER.info("Load dss orchestrator:"+appConnName+",typeName:"+typeName);
AppConn appConn = AppConnManager.getAppConnManager().getAppConn(appConnName);
dssLabels.forEach(dssOrchestrator::addLinkedDssLabels);
dssOrchestrator.setAppConn(appConn);
return dssOrchestrator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2019 WeBank
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.webank.wedatasphere.dss.orchestrator.loader;

import com.webank.wedatasphere.dss.appconn.core.AppConn;


import java.util.List;

public interface LinkedAppConnResolver {
List<AppConn> resolveAppConnByUser(String userName, String workspaceName, String typeName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2019 WeBank
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.webank.wedatasphere.dss.orchestrator.loader;


import com.webank.wedatasphere.dss.appconn.core.exception.AppConnErrorException;
import com.webank.wedatasphere.dss.common.label.DSSLabel;
import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator;

import java.util.List;

public interface OrchestratorLoader {

/**
* 用于返回一个指定类型的的Orchestrator
* @param userName
* @param workspaceName
* @param typeName
* @param appConnName 唯一标识一种类型的AppConn,比如workflowOrchestratorAppConn
* @return
*/
DSSOrchestrator loadOrchestrator(String userName,
String workspaceName,
String typeName,
String appConnName,
List<DSSLabel> dssLabels) throws AppConnErrorException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2019 WeBank
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.webank.wedatasphere.dss.orchestrator.loader;

import com.webank.wedatasphere.dss.appconn.core.exception.AppConnErrorException;
import com.webank.wedatasphere.dss.common.label.DSSLabel;
import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


@Component
public class OrchestratorManager {

private final static Logger logger = LoggerFactory.getLogger(OrchestratorManager.class);

private Map<String, DSSOrchestrator> cacheDssOrchestrator = new ConcurrentHashMap<>();

@Autowired
private DefaultOrchestratorLoader defaultOrchestratorLoader;

public DSSOrchestrator getOrCreateOrchestrator(String userName,
String workspaceName,
String typeName,
String appConnName,
List<DSSLabel> dssLabels) {
String findKey = getCacheKey(userName, workspaceName, typeName, appConnName);
DSSOrchestrator dssOrchestrator = cacheDssOrchestrator.get(findKey);
if (null == dssOrchestrator) {
try {

dssOrchestrator = defaultOrchestratorLoader.loadOrchestrator(userName, workspaceName, typeName, appConnName, dssLabels);

cacheDssOrchestrator.put(findKey, dssOrchestrator);
} catch (AppConnErrorException e) {
logger.error("OrchestratorManager get DSSOrchestrator exception!", e);
}
}
return dssOrchestrator;
}

protected String getCacheKey(String userName, String workspaceName, String typeName, String appConnName) {
return userName + "_" + workspaceName + "_" + typeName + "_" + appConnName;
}
}
Loading

0 comments on commit 60aba15

Please sign in to comment.