Skip to content

Commit

Permalink
[update][plugin][hdfsreader] Refactor: extract orc file reader logic …
Browse files Browse the repository at this point in the history
…into separator class
  • Loading branch information
wgzhao committed Oct 22, 2024
1 parent 2d73b23 commit b2a75a4
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,7 @@
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.wgzhao.addax.common.base.Key;
import com.wgzhao.addax.common.element.BytesColumn;
import com.wgzhao.addax.common.element.Column;
import com.wgzhao.addax.common.element.ColumnEntry;
import com.wgzhao.addax.common.element.DateColumn;
import com.wgzhao.addax.common.element.DoubleColumn;
import com.wgzhao.addax.common.element.LongColumn;
import com.wgzhao.addax.common.element.Record;
import com.wgzhao.addax.common.element.StringColumn;
import com.wgzhao.addax.common.element.TimestampColumn;
import com.wgzhao.addax.common.exception.AddaxException;
import com.wgzhao.addax.common.plugin.RecordSender;
import com.wgzhao.addax.common.plugin.TaskPluginCollector;
Expand All @@ -42,16 +34,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.LongWritable;
Expand All @@ -61,14 +44,11 @@
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.orc.TypeDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
Expand All @@ -95,10 +75,14 @@ public class DFSUtil
private String specifiedFileType = null;
private String kerberosKeytabFilePath;
private String kerberosPrincipal;
private final List<ColumnEntry> columns;
private final String nullFormat;

public DFSUtil(Configuration taskConfig)
{
hadoopConf = new org.apache.hadoop.conf.Configuration();
this.columns = StorageReaderUtil.getListColumnEntry(taskConfig, COLUMN);
this.nullFormat = taskConfig.getString(NULL_FORMAT);
//io.file.buffer.size 性能参数
//http://blog.csdn.net/yangjl38/article/details/7583374
Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);
Expand All @@ -118,10 +102,11 @@ public DFSUtil(Configuration taskConfig)
this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
this.hadoopConf.set(HdfsConstant.HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
// fix Failed to specify server's Kerberos principal name
if (Objects.equals(hadoopConf.get("dfs.namenode.kerberos.principal", ""), "")) {
String KEY_PRINCIPAL = "dfs.namenode.kerberos.principal";
if (Objects.equals(hadoopConf.get(KEY_PRINCIPAL, ""), "")) {
// get REALM
String serverPrincipal = "nn/_HOST@" + Iterables.get(Splitter.on('@').split(this.kerberosPrincipal), 1);
hadoopConf.set("dfs.namenode.kerberos.principal", serverPrincipal);
hadoopConf.set(KEY_PRINCIPAL, serverPrincipal);
}
}
this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);
Expand Down Expand Up @@ -180,7 +165,6 @@ private void addSourceFileIfNotEmpty(FileStatus f)

public void getHDFSAllFiles(String hdfsPath)
{

try {
FileSystem hdfs = FileSystem.get(hadoopConf);
//判断hdfsPath是否包含正则符号
Expand Down Expand Up @@ -209,8 +193,6 @@ else if (f.isDirectory()) {
private void getHDFSAllFilesNORegex(String path, FileSystem hdfs)
throws IOException
{

// 获取要读取的文件的根目录
Path listFiles = new Path(path);

// If the network disconnected, this method will retry 45 times
Expand Down Expand Up @@ -245,11 +227,9 @@ private void addSourceFileByType(String filePath)
sourceHDFSAllFilesList.add(filePath);
}
else {
String message = String.format("The file [%s] format is not the same of [%s] you configured."
, filePath, this.specifiedFileType);
String message = String.format("The file [%s] format is not the same of [%s] you configured.", filePath, specifiedFileType);
LOG.error(message);
throw AddaxException.asAddaxException(
NOT_SUPPORT_TYPE, message);
throw AddaxException.asAddaxException(NOT_SUPPORT_TYPE, message);
}
}

Expand All @@ -276,8 +256,7 @@ public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration r
LOG.info("Begin to read the sequence file [{}].", sourceSequenceFilePath);

Path seqFilePath = new Path(sourceSequenceFilePath);
try (SequenceFile.Reader reader = new SequenceFile.Reader(this.hadoopConf,
SequenceFile.Reader.file(seqFilePath))) {
try (SequenceFile.Reader reader = new SequenceFile.Reader(this.hadoopConf, SequenceFile.Reader.file(seqFilePath))) {
//获取SequenceFile.Reader实例
//获取key 与 value
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), this.hadoopConf);
Expand All @@ -299,11 +278,6 @@ public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceCo
RecordSender recordSender, TaskPluginCollector taskPluginCollector)
{
LOG.info("Start Read rc-file [{}].", sourceRcFilePath);
List<ColumnEntry> column = StorageReaderUtil
.getListColumnEntry(readerSliceConfig, COLUMN);
// warn: no default value '\N'
String nullFormat = readerSliceConfig.getString(NULL_FORMAT);

Path rcFilePath = new Path(sourceRcFilePath);
RCFileRecordReader recordReader = null;
try (FileSystem fs = FileSystem.get(rcFilePath.toUri(), hadoopConf)) {
Expand All @@ -321,8 +295,7 @@ public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceCo
txt.set(v.getData(), v.getStart(), v.getLength());
sourceLine[i] = txt.toString();
}
StorageReaderUtil.transportOneRecord(recordSender,
column, sourceLine, nullFormat, taskPluginCollector);
StorageReaderUtil.transportOneRecord(recordSender, columns, sourceLine, nullFormat, taskPluginCollector);
}
}
catch (IOException e) {
Expand All @@ -338,7 +311,7 @@ public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceCo
}
}
catch (IOException e) {
LOG.warn(String.format("Failed to close RCFileRecordReader: %s", e.getMessage()));
LOG.warn("Failed to close RCFileRecordReader: {}", e.getMessage());
}
}
}
Expand All @@ -347,130 +320,16 @@ public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSlice
RecordSender recordSender, TaskPluginCollector taskPluginCollector)
{
LOG.info("Being to read the orc-file [{}].", sourceOrcFilePath);
List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
String nullFormat = readerSliceConfig.getString(NULL_FORMAT);

try {
Path orcFilePath = new Path(sourceOrcFilePath);
Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(hadoopConf));
TypeDescription schema = reader.getSchema();
assert column != null;
if (column.isEmpty()) {
for (int i = 0; i < schema.getChildren().size(); i++) {
ColumnEntry columnEntry = new ColumnEntry();
columnEntry.setIndex(i);
columnEntry.setType(schema.getChildren().get(i).getCategory().getName());
column.add(columnEntry);
}
}

VectorizedRowBatch rowBatch = schema.createRowBatch(1024);
org.apache.orc.RecordReader rowIterator = reader.rows(reader.options().schema(schema));
while (rowIterator.nextBatch(rowBatch)) {
transportOrcRecord(rowBatch, column, recordSender, taskPluginCollector, nullFormat);
}
}
catch (Exception e) {
String message = String.format("Exception occurred while reading the file [%s].", sourceOrcFilePath);
LOG.error(message);
throw AddaxException.asAddaxException(IO_ERROR, message);
}
}

private void transportOrcRecord(VectorizedRowBatch rowBatch, List<ColumnEntry> columns, RecordSender recordSender,
TaskPluginCollector taskPluginCollector, String nullFormat)
{
Record record;
for (int row = 0; row < rowBatch.size; row++) {
record = recordSender.createRecord();
try {
for (ColumnEntry column : columns) {

Column columnGenerated;
if (column.getValue() != null) {
if (!"null".equals(column.getValue())) {
columnGenerated = new StringColumn(column.getValue());
}
else {
columnGenerated = new StringColumn();
}
record.addColumn(columnGenerated);
continue;
}
int i = column.getIndex();
String columnType = column.getType().toUpperCase();
ColumnVector col = rowBatch.cols[i];
JavaType type = JavaType.valueOf(columnType);
if (col.isNull[row]) {
record.addColumn(new StringColumn(null));
continue;
}
switch (type) {
case INT:
case LONG:
case BOOLEAN:
case BIGINT:
columnGenerated = new LongColumn(((LongColumnVector) col).vector[row]);
break;
case DATE:
columnGenerated = new DateColumn(new Date(((LongColumnVector) col).vector[row]));
break;
case FLOAT:
case DOUBLE:
columnGenerated = new DoubleColumn(((DoubleColumnVector) col).vector[row]);
break;
case DECIMAL:
columnGenerated = new DoubleColumn(((DecimalColumnVector) col).vector[row].doubleValue());
break;
case BINARY:
BytesColumnVector b = (BytesColumnVector) col;
byte[] val = Arrays.copyOfRange(b.vector[row], b.start[row], b.start[row] + b.length[row]);
columnGenerated = new BytesColumn(val);
break;
case TIMESTAMP:
// FIXME: incorrect timezone value
columnGenerated = new TimestampColumn(((TimestampColumnVector) col).getTime(row));
break;
default:
// type is string or other
String v = ((BytesColumnVector) col).toString(row);
columnGenerated = v.equals(nullFormat) ? new StringColumn() : new StringColumn(v);
break;
}
record.addColumn(columnGenerated);
}
recordSender.sendToWriter(record);
}
catch (Exception e) {
if (e instanceof AddaxException) {
throw (AddaxException) e;
}
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
}
}
MyOrcReader myOrcReader = new MyOrcReader(hadoopConf, new Path(sourceOrcFilePath), nullFormat, columns);
myOrcReader.reader(recordSender, taskPluginCollector);
}

public void parquetFileStartRead(String sourceParquetFilePath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector)
{
LOG.info("Begin to read the parquet-file [{}].", sourceParquetFilePath);
String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
Path parquetFilePath = new Path(sourceParquetFilePath);
MyParquetReader myParquetReader = new MyParquetReader(hadoopConf, parquetFilePath, nullFormat, column);

MyParquetReader myParquetReader = new MyParquetReader(hadoopConf, parquetFilePath, nullFormat, columns);
myParquetReader.reader(recordSender, taskPluginCollector);
}

private TypeDescription getOrcSchema(String filePath)
{
Path path = new Path(filePath);
try {
Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(hadoopConf));
return reader.getSchema();
}
catch (IOException e) {
throw AddaxException.asAddaxException(IO_ERROR, "IO exception occurred when reading orc file");
}
}
}
Loading

0 comments on commit b2a75a4

Please sign in to comment.