19
19
package com .dtstack .flinkx .launcher ;
20
20
21
21
import org .apache .commons .lang .StringUtils ;
22
- import org .apache .flink .client .deployment .ClusterRetrieveException ;
23
- import org .apache .flink .client .deployment .StandaloneClusterDescriptor ;
24
22
import org .apache .flink .client .program .ClusterClient ;
25
23
import org .apache .flink .configuration .Configuration ;
26
24
import org .apache .flink .configuration .GlobalConfiguration ;
38
36
import java .util .List ;
39
37
import java .util .Map ;
40
38
import java .util .Set ;
41
- import org .apache .flink .client .program .rest .RestClusterClient ;
42
39
import org .apache .hadoop .yarn .api .records .ApplicationId ;
40
+ import org .apache .flink .client .program .StandaloneClusterClient ;
43
41
44
42
/**
45
43
* The Factory of ClusterClient
49
47
*/
50
48
public class ClusterClientFactory {
51
49
52
- public static ClusterClient createClusterClient (LauncherOptions launcherOptions ) throws ClusterRetrieveException {
50
+ public static ClusterClient createClusterClient (LauncherOptions launcherOptions ) throws Exception {
53
51
String clientType = launcherOptions .getMode ();
54
52
if (ClusterMode .standalone .name ().equals (clientType )) {
55
53
return createStandaloneClient (launcherOptions );
@@ -59,11 +57,10 @@ public static ClusterClient createClusterClient(LauncherOptions launcherOptions)
59
57
throw new IllegalArgumentException ("Unsupported cluster client type: " );
60
58
}
61
59
62
- private static RestClusterClient createStandaloneClient (LauncherOptions launcherOptions ) throws ClusterRetrieveException {
60
+ private static ClusterClient createStandaloneClient (LauncherOptions launcherOptions ) throws Exception {
63
61
String flinkConfDir = launcherOptions .getFlinkconf ();
64
62
Configuration config = GlobalConfiguration .loadConfiguration (flinkConfDir );
65
- StandaloneClusterDescriptor descriptor = new StandaloneClusterDescriptor (config );
66
- RestClusterClient clusterClient = descriptor .retrieve (null );
63
+ StandaloneClusterClient clusterClient = new StandaloneClusterClient (config );
67
64
clusterClient .setDetached (true );
68
65
return clusterClient ;
69
66
}
0 commit comments