28
28
import org .slf4j .Logger ;
29
29
import org .slf4j .LoggerFactory ;
30
30
31
+ import java .io .File ;
31
32
import java .io .InputStream ;
32
33
import java .util .ArrayList ;
33
34
import java .util .HashSet ;
@@ -57,6 +58,8 @@ public static class Job
57
58
private String connectPattern ;
58
59
private int maxTraversalLevel ;
59
60
61
+ private static final String DEFAULT_PRIVATE_KEY = "~/.ssh/id_rsa" ;
62
+
60
63
private FtpHelper ftpHelper = null ;
61
64
62
65
@ Override
@@ -67,6 +70,8 @@ public void init()
67
70
68
71
this .validateParameter ();
69
72
StorageReaderUtil .validateParameter (this .originConfig );
73
+ String keyPath = this .originConfig .getString (FtpKey .KEY_PATH , null );
74
+ String keyPass = this .originConfig .getString (FtpKey .KEY_PASS , null );
70
75
71
76
if ("sftp" .equals (protocol )) {
72
77
//sftp协议
@@ -78,20 +83,20 @@ else if ("ftp".equals(protocol)) {
78
83
this .port = originConfig .getInt (FtpKey .PORT , FtpConstant .DEFAULT_FTP_PORT );
79
84
this .ftpHelper = new StandardFtpHelper ();
80
85
}
81
- ftpHelper .loginFtpServer (host , username , password , port , timeout , connectPattern );
86
+ ftpHelper .loginFtpServer (host , username , password , port , keyPath , keyPass , timeout , connectPattern );
82
87
}
83
88
84
89
private void validateParameter ()
85
90
{
86
- this .protocol = this .originConfig .getNecessaryValue (FtpKey .PROTOCOL , FtpReaderErrorCode .REQUIRED_VALUE );
91
+ this .protocol = this .originConfig .getNecessaryValue (FtpKey .PROTOCOL , FtpReaderErrorCode .REQUIRED_VALUE ). toLowerCase () ;
87
92
boolean protocolTag = "ftp" .equals (this .protocol ) || "sftp" .equals (this .protocol );
88
93
if (!protocolTag ) {
89
94
throw AddaxException .asAddaxException (FtpReaderErrorCode .ILLEGAL_VALUE ,
90
95
String .format ("仅支持 ftp和sftp 传输协议 , 不支持您配置的传输协议: [%s]" , protocol ));
91
96
}
92
97
this .host = this .originConfig .getNecessaryValue (FtpKey .HOST , FtpReaderErrorCode .REQUIRED_VALUE );
93
98
this .username = this .originConfig .getNecessaryValue (FtpKey .USERNAME , FtpReaderErrorCode .REQUIRED_VALUE );
94
- this .password = this .originConfig .getNecessaryValue (FtpKey .PASSWORD , FtpReaderErrorCode . REQUIRED_VALUE );
99
+ this .password = this .originConfig .getString (FtpKey .PASSWORD , null );
95
100
this .timeout = originConfig .getInt (FtpKey .TIME_OUT , FtpConstant .DEFAULT_TIMEOUT );
96
101
this .maxTraversalLevel = originConfig .getInt (FtpKey .MAX_TRAVERSAL_LEVEL , FtpConstant .DEFAULT_MAX_TRAVERSAL_LEVEL );
97
102
@@ -114,7 +119,7 @@ private void validateParameter()
114
119
}
115
120
else {
116
121
path = this .originConfig .getList (Key .PATH , String .class );
117
- if (null == path || path .isEmpty () ) {
122
+ if (null == path || path .isEmpty ()) {
118
123
throw AddaxException .asAddaxException (FtpReaderErrorCode .REQUIRED_VALUE , "您需要指定待读取的源目录或文件" );
119
124
}
120
125
for (String eachPath : path ) {
@@ -125,6 +130,28 @@ private void validateParameter()
125
130
}
126
131
}
127
132
}
133
+ if ("sftp" .equals (protocol )) {
134
+ // use ssh private key or not ?
135
+ boolean useKey = this .originConfig .getBool (FtpKey .USE_KEY , false );
136
+ if (useKey ) {
137
+ String privateKey = this .originConfig .getString (FtpKey .KEY_PATH , DEFAULT_PRIVATE_KEY );
138
+ // check privateKey does exist or not
139
+ if (privateKey .startsWith ("~" )) {
140
+ // expand home directory
141
+ privateKey = privateKey .replaceFirst ("^~" , System .getProperty ("user.home" ));
142
+ // does it exist?
143
+ boolean isFile = new File (privateKey ).isFile ();
144
+ if (isFile ) {
145
+ this .originConfig .set (FtpKey .KEY_PATH , privateKey );
146
+ }
147
+ else {
148
+ String msg = "You have configured to use the key, but neither the configured key file nor the default file(" +
149
+ DEFAULT_PRIVATE_KEY + " exists" ;
150
+ throw AddaxException .asAddaxException (FtpReaderErrorCode .ILLEGAL_VALUE , msg );
151
+ }
152
+ }
153
+ }
154
+ }
128
155
}
129
156
130
157
@ Override
@@ -224,7 +251,8 @@ public void init()
224
251
this .username = readerSliceConfig .getString (FtpKey .USERNAME );
225
252
String password = readerSliceConfig .getString (FtpKey .PASSWORD );
226
253
int timeout = readerSliceConfig .getInt (FtpKey .TIME_OUT , FtpConstant .DEFAULT_TIMEOUT );
227
-
254
+ String keyPath = readerSliceConfig .getString (FtpKey .KEY_PATH , null );
255
+ String keyPass = readerSliceConfig .getString (FtpKey .KEY_PASS , null );
228
256
this .sourceFiles = this .readerSliceConfig .getList (FtpKey .SOURCE_FILES , String .class );
229
257
230
258
if ("sftp" .equals (protocol )) {
@@ -238,7 +266,7 @@ else if ("ftp".equals(protocol)) {
238
266
this .connectPattern = readerSliceConfig .getString (FtpKey .CONNECT_PATTERN , FtpConstant .DEFAULT_FTP_CONNECT_PATTERN );// 默认为被动模式
239
267
this .ftpHelper = new StandardFtpHelper ();
240
268
}
241
- ftpHelper .loginFtpServer (host , username , password , port , timeout , connectPattern );
269
+ ftpHelper .loginFtpServer (host , username , password , port , keyPath , keyPass , timeout , connectPattern );
242
270
}
243
271
244
272
@ Override
0 commit comments