13
13
import com .alibaba .datax .common .plugin .RecordSender ;
14
14
import com .alibaba .datax .common .plugin .TaskPluginCollector ;
15
15
import com .alibaba .datax .common .util .Configuration ;
16
+ import com .alibaba .datax .plugin .unstructuredstorage .reader .ColumnEntry ;
16
17
import com .alibaba .datax .plugin .unstructuredstorage .reader .UnstructuredStorageReaderErrorCode ;
18
+ import com .alibaba .datax .plugin .unstructuredstorage .reader .UnstructuredStorageReaderUtil ;
19
+ import com .alibaba .fastjson .JSON ;
20
+ import com .alibaba .fastjson .JSONObject ;
21
+
17
22
import org .apache .commons .lang3 .StringUtils ;
18
23
import org .apache .hadoop .fs .*;
19
24
import org .apache .hadoop .hive .ql .io .orc .*;
28
33
import org .slf4j .Logger ;
29
34
import org .slf4j .LoggerFactory ;
30
35
31
- /**
32
- * Created by mingya.wmy on 2015/8/12.
33
- */
34
36
public class DFSUtil {
35
37
private static final Logger LOG = LoggerFactory .getLogger (HdfsReader .Job .class );
36
38
@@ -39,10 +41,21 @@ public class DFSUtil {
39
41
private static final int DIRECTORY_SIZE_GUESS = 16 * 1024 ;
40
42
41
43
private String specifiedFileType = null ;
42
-
43
- public DFSUtil (String defaultFS ){
44
+
45
+ public DFSUtil (Configuration taskConfig ){
44
46
hadoopConf = new org .apache .hadoop .conf .Configuration ();
45
- hadoopConf .set ("fs.defaultFS" , defaultFS );
47
+ //io.file.buffer.size 性能参数
48
+ //http://blog.csdn.net/yangjl38/article/details/7583374
49
+ Configuration hadoopSiteParams = taskConfig .getConfiguration (Key .HADOOP_CONFIG );
50
+ JSONObject hadoopSiteParamsAsJsonObject = JSON .parseObject (taskConfig .getString (Key .HADOOP_CONFIG ));
51
+ if (null != hadoopSiteParams ) {
52
+ Set <String > paramKeys = hadoopSiteParams .getKeys ();
53
+ for (String each : paramKeys ) {
54
+ hadoopConf .set (each , hadoopSiteParamsAsJsonObject .getString (each ));
55
+ }
56
+ }
57
+ hadoopConf .set ("fs.defaultFS" , taskConfig .getString (Key .DEFAULT_FS ));
58
+ LOG .info (String .format ("hadoopConfig details:%s" , JSON .toJSONString (hadoopConf )));
46
59
}
47
60
48
61
@@ -159,7 +172,7 @@ public InputStream getInputStream(String filepath){
159
172
return null ;
160
173
}
161
174
162
- public BufferedReader getBufferedReader (String filepath , HdfsFileType fileType , String encoding ){
175
+ public BufferedReader getBufferedReader (String filepath , HdfsFileType fileType , String encoding , int bufferSize ){
163
176
try {
164
177
FileSystem fs = FileSystem .get (hadoopConf );
165
178
Path path = new Path (filepath );
@@ -181,12 +194,12 @@ public BufferedReader getBufferedReader(String filepath, HdfsFileType fileType,
181
194
//each time the retry interval for 20 seconds
182
195
in = fs .open (path );
183
196
cin = codec .createInputStream (in );
184
- br = new BufferedReader (new InputStreamReader (cin , encoding ));
197
+ br = new BufferedReader (new InputStreamReader (cin , encoding ), bufferSize );
185
198
} else {
186
199
//If the network disconnected, this method will retry 45 times
187
200
// each time the retry interval for 20 seconds
188
201
in = fs .open (path );
189
- br = new BufferedReader (new InputStreamReader (in , encoding ));
202
+ br = new BufferedReader (new InputStreamReader (in , encoding ), bufferSize );
190
203
}
191
204
return br ;
192
205
}catch (Exception e ){
@@ -198,35 +211,37 @@ public BufferedReader getBufferedReader(String filepath, HdfsFileType fileType,
198
211
public void orcFileStartRead (String sourceOrcFilePath , Configuration readerSliceConfig ,
199
212
RecordSender recordSender , TaskPluginCollector taskPluginCollector ){
200
213
201
- List <Configuration > columnConfigs = readerSliceConfig .getListConfiguration (Key .COLUMN );
202
- String nullFormat = readerSliceConfig .getString (Key .NULL_FORMAT );
203
- String allColumns = "" ;
204
- String allColumnTypes = "" ;
214
+ //List<Configuration> columnConfigs = readerSliceConfig.getListConfiguration(Key.COLUMN);
215
+ List <ColumnEntry > column = UnstructuredStorageReaderUtil
216
+ .getListColumnEntry (readerSliceConfig , com .alibaba .datax .plugin .unstructuredstorage .reader .Key .COLUMN );
217
+ String nullFormat = readerSliceConfig .getString (com .alibaba .datax .plugin .unstructuredstorage .reader .Key .NULL_FORMAT );
218
+ StringBuilder allColumns = new StringBuilder ();
219
+ StringBuilder allColumnTypes = new StringBuilder ();
205
220
boolean isReadAllColumns = false ;
206
221
int columnIndexMax = -1 ;
207
222
// 判断是否读取所有列
208
- if (null == columnConfigs || columnConfigs .size () == 0 ) {
223
+ if (null == column || column .size () == 0 ) {
209
224
int allColumnsCount = getAllColumnsCount (sourceOrcFilePath );
210
225
columnIndexMax = allColumnsCount -1 ;
211
226
isReadAllColumns = true ;
212
227
}
213
228
else {
214
- columnIndexMax = getMaxIndex (columnConfigs );
229
+ columnIndexMax = getMaxIndex (column );
215
230
}
216
231
for (int i =0 ; i <=columnIndexMax ; i ++){
217
- allColumns += "col" ;
218
- allColumnTypes += "string" ;
232
+ allColumns . append ( "col" ) ;
233
+ allColumnTypes . append ( "string" ) ;
219
234
if (i !=columnIndexMax ){
220
- allColumns += "," ;
221
- allColumnTypes += ":" ;
235
+ allColumns . append ( "," ) ;
236
+ allColumnTypes . append ( ":" ) ;
222
237
}
223
238
}
224
239
if (columnIndexMax >=0 ) {
225
240
JobConf conf = new JobConf (hadoopConf );
226
241
Path orcFilePath = new Path (sourceOrcFilePath );
227
242
Properties p = new Properties ();
228
- p .setProperty ("columns" , allColumns );
229
- p .setProperty ("columns.types" , allColumnTypes );
243
+ p .setProperty ("columns" , allColumns . toString () );
244
+ p .setProperty ("columns.types" , allColumnTypes . toString () );
230
245
try {
231
246
OrcSerde serde = new OrcSerde ();
232
247
serde .initialize (conf , p );
@@ -236,6 +251,7 @@ public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSlice
236
251
237
252
//If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
238
253
//Each file as a split
254
+ //TODO multy threads
239
255
InputSplit [] splits = in .getSplits (conf , 1 );
240
256
241
257
RecordReader reader = in .getRecordReader (splits [0 ], conf , Reporter .NULL );
@@ -252,7 +268,7 @@ public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSlice
252
268
Object field = inspector .getStructFieldData (value , fields .get (i ));
253
269
recordFields .add (field );
254
270
}
255
- transportOneRecord (columnConfigs , recordFields , recordSender ,
271
+ transportOneRecord (column , recordFields , recordSender ,
256
272
taskPluginCollector , isReadAllColumns ,nullFormat );
257
273
}
258
274
reader .close ();
@@ -269,7 +285,7 @@ public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSlice
269
285
}
270
286
}
271
287
272
- private Record transportOneRecord (List <Configuration > columnConfigs , List <Object > recordFields
288
+ private Record transportOneRecord (List <ColumnEntry > columnConfigs , List <Object > recordFields
273
289
, RecordSender recordSender , TaskPluginCollector taskPluginCollector , boolean isReadAllColumns , String nullFormat ){
274
290
Record record = recordSender .createRecord ();
275
291
Column columnGenerated = null ;
@@ -286,11 +302,10 @@ private Record transportOneRecord(List<Configuration> columnConfigs, List<Object
286
302
}
287
303
}
288
304
else {
289
- for (Configuration columnConfig : columnConfigs ) {
290
- String columnType = columnConfig
291
- .getNecessaryValue (Key .TYPE , HdfsReaderErrorCode .CONFIG_INVALID_EXCEPTION );
292
- Integer columnIndex = columnConfig .getInt (Key .INDEX );
293
- String columnConst = columnConfig .getString (Key .VALUE );
305
+ for (ColumnEntry columnConfig : columnConfigs ) {
306
+ String columnType = columnConfig .getType ();
307
+ Integer columnIndex = columnConfig .getIndex ();
308
+ String columnConst = columnConfig .getValue ();
294
309
295
310
String columnValue = null ;
296
311
@@ -343,7 +358,7 @@ private Record transportOneRecord(List<Configuration> columnConfigs, List<Object
343
358
Date date = null ;
344
359
columnGenerated = new DateColumn (date );
345
360
} else {
346
- String formatString = columnConfig .getString ( Key . FORMAT );
361
+ String formatString = columnConfig .getFormat ( );
347
362
if (StringUtils .isNotBlank (formatString )) {
348
363
// 用户自己配置的格式转换
349
364
SimpleDateFormat format = new SimpleDateFormat (
@@ -410,10 +425,10 @@ private int getAllColumnsCount(String filePath){
410
425
}
411
426
}
412
427
413
- private int getMaxIndex (List <Configuration > columnConfigs ){
428
+ private int getMaxIndex (List <ColumnEntry > columnConfigs ){
414
429
int maxIndex = -1 ;
415
- for (Configuration columnConfig : columnConfigs ) {
416
- Integer columnIndex = columnConfig .getInt ( Key . INDEX );
430
+ for (ColumnEntry columnConfig : columnConfigs ) {
431
+ Integer columnIndex = columnConfig .getIndex ( );
417
432
if (columnIndex != null && columnIndex < 0 ) {
418
433
String message = String .format ("您column中配置的index不能小于0,请修改为正确的index" );
419
434
LOG .error (message );
0 commit comments