34
34
import org .slf4j .LoggerFactory ;
35
35
36
36
import java .io .File ;
37
- import java .io .FileOutputStream ;
38
37
import java .io .FilenameFilter ;
39
38
import java .io .IOException ;
40
39
import java .io .OutputStream ;
40
+ import java .nio .file .Files ;
41
+ import java .nio .file .Paths ;
41
42
import java .util .ArrayList ;
42
43
import java .util .Arrays ;
43
44
import java .util .HashSet ;
56
57
import static com .wgzhao .addax .common .spi .ErrorCode .EXECUTE_FAIL ;
57
58
import static com .wgzhao .addax .common .spi .ErrorCode .ILLEGAL_VALUE ;
58
59
import static com .wgzhao .addax .common .spi .ErrorCode .IO_ERROR ;
60
+ import static com .wgzhao .addax .common .spi .ErrorCode .NOT_SUPPORT_TYPE ;
59
61
import static com .wgzhao .addax .common .spi .ErrorCode .PERMISSION_ERROR ;
60
62
import static com .wgzhao .addax .common .spi .ErrorCode .REQUIRED_VALUE ;
61
63
import static com .wgzhao .addax .common .spi .ErrorCode .RUNTIME_ERROR ;
@@ -84,7 +86,7 @@ public void init()
84
86
this .writerSliceConfig .set (DATE_FORMAT , dateFormatOld );
85
87
}
86
88
if (null != dateFormatOld ) {
87
- LOG .warn ("You are using format to configure date format, this is not recommended, please use dateFormat to configure. If both dateFormat and format exist, dateFormat will be used ." );
89
+ LOG .warn ("The ` format` item has been deprecated; please use ` dateFormat` for configuration ." );
88
90
}
89
91
StorageWriterUtil .validateParameter (this .writerSliceConfig );
90
92
}
@@ -103,12 +105,15 @@ private void validateParameter()
103
105
String .format ("The path [%s] is a file, not a directory." , path ));
104
106
}
105
107
106
- if (!dir .exists ()) {
107
- if (!dir .mkdirs ()) {
108
- throw AddaxException .asAddaxException (
109
- EXECUTE_FAIL ,
110
- String .format ("Failed to create directory [%s]." , path ));
111
- }
108
+ if (!dir .exists () && !dir .mkdirs ()) {
109
+ throw AddaxException .asAddaxException (
110
+ EXECUTE_FAIL ,
111
+ "Failed to create directory: " + path );
112
+ }
113
+ else if (!dir .canWrite ()) {
114
+ throw AddaxException .asAddaxException (
115
+ PERMISSION_ERROR ,
116
+ "No write permission on directory: " + path );
112
117
}
113
118
}
114
119
@@ -119,7 +124,6 @@ public void prepare()
119
124
String fileName = this .writerSliceConfig .getString (FILE_NAME );
120
125
String writeMode = this .writerSliceConfig .getString (WRITE_MODE );
121
126
122
- assert FileHelper .checkDirectoryWritable (path );
123
127
File dir = new File (path );
124
128
// truncate option handler
125
129
if ("truncate" .equalsIgnoreCase (writeMode ) || "overwrite" .equalsIgnoreCase (writeMode )) {
@@ -138,10 +142,6 @@ public void prepare()
138
142
throw AddaxException .asAddaxException (RUNTIME_ERROR ,
139
143
String .format ("NullPointException occurred when clean history files under this path [%s]." , path ), npe );
140
144
}
141
- catch (IllegalArgumentException iae ) {
142
- throw AddaxException .asAddaxException (PERMISSION_ERROR ,
143
- String .format ("IllegalArgumentException occurred when clean history files under this path [%s]." , path ), iae );
144
- }
145
145
catch (IOException e ) {
146
146
throw AddaxException .asAddaxException (IO_ERROR ,
147
147
String .format ("IOException occurred when clean history files under this path [%s]." , path ), e );
@@ -156,7 +156,7 @@ else if ("nonConflict".equals(writeMode)) {
156
156
if (dir .exists ()) {
157
157
if (!dir .canRead ()) {
158
158
throw AddaxException .asAddaxException (PERMISSION_ERROR ,
159
- String . format ( "Permission denied to access path [%s]." , path ) );
159
+ "Permission denied to access path " + path );
160
160
}
161
161
// fileName is not null
162
162
FilenameFilter filter = new PrefixFileFilter (fileName );
@@ -172,26 +172,13 @@ else if ("nonConflict".equals(writeMode)) {
172
172
String .format ("The directory [%s] contains files." , path ));
173
173
}
174
174
}
175
- else {
176
- boolean createdOk = dir .mkdirs ();
177
- if (!createdOk ) {
178
- throw AddaxException .asAddaxException (EXECUTE_FAIL ,
179
- String .format ("Failed to create the file [%s]." , path ));
180
- }
181
- }
182
175
}
183
176
else {
184
- throw AddaxException .asAddaxException (ILLEGAL_VALUE ,
185
- String . format ( "ONLY support truncate, append and nonConflict as writeMode , but you give [%s]." , writeMode ) );
177
+ throw AddaxException .asAddaxException (NOT_SUPPORT_TYPE ,
178
+ "Only ' truncate', ' append', and ' nonConflict' are supported as write modes , but you provided " + writeMode );
186
179
}
187
180
}
188
181
189
- @ Override
190
- public void post ()
191
- {
192
- //
193
- }
194
-
195
182
@ Override
196
183
public void destroy ()
197
184
{
@@ -258,7 +245,7 @@ public void startWrite(RecordReceiver lineReceiver)
258
245
try {
259
246
File newFile = new File (fileFullPath );
260
247
assert newFile .createNewFile ();
261
- outputStream = new FileOutputStream (newFile );
248
+ outputStream = Files . newOutputStream (newFile . toPath () );
262
249
StorageWriterUtil .writeToStream (lineReceiver , outputStream , this .writerSliceConfig , this .fileName ,
263
250
this .getTaskPluginCollector ());
264
251
}
@@ -276,12 +263,6 @@ public void startWrite(RecordReceiver lineReceiver)
276
263
LOG .info ("end do write" );
277
264
}
278
265
279
- @ Override
280
- public void post ()
281
- {
282
- //
283
- }
284
-
285
266
@ Override
286
267
public void destroy ()
287
268
{
0 commit comments