Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
plugin hdfsreader: simplify parquet file reader codes
Browse files Browse the repository at this point in the history
wgzhao committed Jun 29, 2021
1 parent bd4aa41 commit ce8cfbf
Showing 1 changed file with 169 additions and 141 deletions.
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
@@ -74,6 +75,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -356,31 +358,23 @@ public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSlice
String nullFormat = readerSliceConfig.getString(NULL_FORMAT);

try {
// 要读取的字段索引,存在读取指定的字段而不是全部
List<Integer> readColumnIndex = new ArrayList<>();

Path orcFilePath = new Path(sourceOrcFilePath);
Reader reader = OrcFile.createReader(orcFilePath, OrcFile.readerOptions(hadoopConf));
TypeDescription schema = reader.getSchema();
// 判断是否读取所有列
// 1. 没有配置 column 字段
// 2. column 配置为空 list
// 3. column 配置为 ["*"]
if (null == column || column.isEmpty() || (column.size() == 1 && "*".equals(column.get(0).getValue()))) {
assert column != null;
if (column.isEmpty()) {
for (int i = 0; i < schema.getChildren().size(); i++) {
readColumnIndex.add(i);
}
}
else {
for (ColumnEntry col : column) {
readColumnIndex.add(col.getIndex());
ColumnEntry columnEntry = new ColumnEntry();
columnEntry.setIndex(i);
columnEntry.setType(schema.getChildren().get(i).getCategory().getName());
column.add(columnEntry);
}
}

VectorizedRowBatch rowBatch = schema.createRowBatch(1024);
org.apache.orc.RecordReader rowIterator = reader.rows(reader.options().schema(schema));
while (rowIterator.nextBatch(rowBatch)) {
transportOrcRecord(rowBatch, readColumnIndex, column, recordSender, taskPluginCollector, nullFormat);
transportOrcRecord(rowBatch, column, recordSender, taskPluginCollector, nullFormat);
}
}
catch (Exception e) {
@@ -391,23 +385,35 @@ public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSlice
}
}

private void transportOrcRecord(VectorizedRowBatch rowBatch, List<Integer> columnIndex, List<ColumnEntry> columns, RecordSender recordSender,
TaskPluginCollector taskPluginCollector, String nullFormat)
private void transportOrcRecord(VectorizedRowBatch rowBatch, List<ColumnEntry> columns, RecordSender recordSender, TaskPluginCollector taskPluginCollector, String nullFormat)
{
Record record;
for (int row = 0; row < rowBatch.size; row++) {
record = recordSender.createRecord();
try {
for (int i : columnIndex) {
for (ColumnEntry column : columns) {

Column columnGenerated;
if (column.getValue() != null) {
if (!"null".equals(column.getValue())) {
columnGenerated = new StringColumn(column.getValue());
}
else {
columnGenerated = null;
}
record.addColumn(columnGenerated);
continue;
}
int i = column.getIndex();
String columnType = column.getType().toUpperCase();
ColumnVector col = rowBatch.cols[i];
String columnType = columns.get(i).getType().toUpperCase();
Type type = Type.valueOf(columnType);
if (col.isNull[row]) {
record.addColumn(null);
continue;
}
switch (type) {
case INT:
case LONG:
case DATE:
case BOOLEAN:
@@ -416,19 +422,21 @@ record = recordSender.createRecord();
case DOUBLE:
columnGenerated = new DoubleColumn(((DoubleColumnVector) col).vector[row]);
break;
case STRING:
columnGenerated = new StringColumn(((BytesColumnVector) col).toString(row));
case DECIMAL:
columnGenerated = new DoubleColumn(((DecimalColumnVector) col).vector[row].doubleValue());
break;
case BINARY:
BytesColumnVector b = (BytesColumnVector) col;
byte[] val = Arrays.copyOfRange(b.vector[row], b.start[row], b.start[row] + b.length[row]);
columnGenerated = new BytesColumn(val);
break;
case TIMESTAMP:
columnGenerated = new DateColumn(((TimestampColumnVector) col).getTimestampAsLong(row) * 1000);
columnGenerated = new DateColumn(((TimestampColumnVector) col).getTime(row));
break;
default:
columnGenerated = new StringColumn(((BytesColumnVector) col).toString(row));
// type is string or other
String v = ((BytesColumnVector) col).toString(row);
columnGenerated = v.equals(nullFormat) ? null : new StringColumn(v);
break;
}
record.addColumn(columnGenerated);
@@ -448,49 +456,58 @@ public void parquetFileStartRead(String sourceParquestFilePath, Configuration re
RecordSender recordSender, TaskPluginCollector taskPluginCollector)
{
LOG.info("Start Read orcfile [{}].", sourceParquestFilePath);
List<ColumnEntry> column = StorageReaderUtil
.getListColumnEntry(readerSliceConfig, COLUMN);
List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
boolean isReadAllColumns = false;
int columnIndexMax;
Path parquetFilePath = new Path(sourceParquestFilePath);
boolean isReadAllColumns = null == column || column.isEmpty();
// 判断是否读取所有列
if (null == column || column.isEmpty()) {
int allColumnsCount = getAllColumnsCount(sourceParquestFilePath);
columnIndexMax = allColumnsCount - 1;
isReadAllColumns = true;
}
else {
columnIndexMax = getMaxIndex(column);
}
if (columnIndexMax >= 0) {
JobConf conf = new JobConf(hadoopConf);
Path parquetFilePath = new Path(sourceParquestFilePath);
// InputFile inputFile =
GenericData decimalSupport = new GenericData();
decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
try (ParquetReader<GenericData.Record> reader = AvroParquetReader
.<GenericData.Record>builder(parquetFilePath)
.withDataModel(decimalSupport)
.withConf(conf)
.build()) {
GenericData.Record gRecord = reader.read();
Schema schema = gRecord.getSchema();
while (gRecord != null) {
transportOneRecord(column, gRecord, schema, recordSender, taskPluginCollector, isReadAllColumns, nullFormat
);
gRecord = reader.read();

JobConf conf = new JobConf(hadoopConf);

GenericData decimalSupport = new GenericData();
decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
try (ParquetReader<GenericData.Record> reader = AvroParquetReader
.<GenericData.Record>builder(parquetFilePath)
.withDataModel(decimalSupport)
.withConf(conf)
.build()) {
GenericData.Record gRecord = reader.read();
Schema schema = gRecord.getSchema();

if (null == column || column.isEmpty()) {
column = new ArrayList<>(schema.getFields().size());
String stype;
// 用户没有填写具体的字段信息,需要从parquet文件构建
for (int i = 0; i < schema.getFields().size(); i++) {
ColumnEntry columnEntry = new ColumnEntry();
columnEntry.setIndex(i);
Schema type;
if (schema.getFields().get(i).schema().getType() == Schema.Type.UNION) {
type = schema.getFields().get(i).schema().getTypes().get(1);
}
else {
type = schema.getFields().get(i).schema();
}
stype = type.getProp("logicalType") != null ? type.getProp("logicalType") : type.getType().getName();
if (stype.startsWith("timestamp")) {
columnEntry.setType("timestamp");
} else{
columnEntry.setType(stype);
}
column.add(columnEntry);
}
}
catch (IOException e) {
String message = String.format("从parquetfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
, sourceParquestFilePath);
LOG.error(message);
throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
while (gRecord != null) {
transportOneRecord(column, gRecord, schema, recordSender, taskPluginCollector, isReadAllColumns, nullFormat
);
gRecord = reader.read();
}
}
else {
String message = String.format("请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
catch (IOException e) {
String message = String.format("从parquetfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
, sourceParquestFilePath);
LOG.error(message);
throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
}
}

@@ -504,95 +521,106 @@ private void transportOneRecord(List<ColumnEntry> columnConfigs, GenericData.Rec
{
Record record = recordSender.createRecord();
Column columnGenerated;
int scale = 10;
try {
if (isReadAllColumns) {
for (int i = 0; i < schema.getFields().size(); i++) {
record.addColumn(new StringColumn((String) gRecord.get(i)));
}
}
else {
for (ColumnEntry columnEntry : columnConfigs) {
String columnType = columnEntry.getType();
Integer columnIndex = columnEntry.getIndex();
String columnConst = columnEntry.getValue();
String columnValue = null;
if (null != columnIndex) {
if (null != gRecord.get(columnIndex)) {
columnValue = gRecord.get(columnIndex).toString();
} else {
record.addColumn(null);
continue;
}
for (ColumnEntry columnEntry : columnConfigs) {
String columnType = columnEntry.getType();
Integer columnIndex = columnEntry.getIndex();
String columnConst = columnEntry.getValue();
String columnValue = null;
if (null != columnIndex) {
if (null != gRecord.get(columnIndex)) {
columnValue = gRecord.get(columnIndex).toString();
}
else {
columnValue = columnConst;
record.addColumn(null);
continue;
}
}
else {
columnValue = columnConst;
}
if (columnType.startsWith("decimal(")) {
String ps = columnType.replace("decimal(", "").replace(")", "");
columnType = "decimal";
if (ps.contains(",")) {
scale = Integer.parseInt(ps.split(",")[1].trim());
}
Type type = Type.valueOf(columnType.toUpperCase());
if (StringUtils.equals(columnValue, nullFormat)) {
columnValue = null;
else {
scale = 0;
}
try {
switch (type) {
case STRING:
columnGenerated = new StringColumn(columnValue);
break;
case LONG:
columnGenerated = new LongColumn(columnValue);
break;
case DOUBLE:
columnGenerated = new DoubleColumn(columnValue);
break;
case BOOLEAN:
columnGenerated = new BoolColumn(columnValue);
break;
case DATE:
if (columnValue == null) {
columnGenerated = new DateColumn((Date) null);
}
Type type = Type.valueOf(columnType.toUpperCase());
if (StringUtils.equals(columnValue, nullFormat)) {
columnValue = null;
}
try {
switch (type) {
case STRING:
columnGenerated = new StringColumn(columnValue);
break;
case INT:
case LONG:
columnGenerated = new LongColumn(columnValue);
break;
case DOUBLE:
columnGenerated = new DoubleColumn(columnValue);
break;
case DECIMAL:
columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, BigDecimal.ROUND_HALF_UP));
break;
case BOOLEAN:
columnGenerated = new BoolColumn(columnValue);
break;
case DATE:
if (columnValue == null) {
columnGenerated = new DateColumn((Date) null);
}
else {
String formatString = columnEntry.getFormat();
if (StringUtils.isNotBlank(formatString)) {
// 用户自己配置的格式转换
SimpleDateFormat format = new SimpleDateFormat(
formatString);
columnGenerated = new DateColumn(
format.parse(columnValue));
}
else {
String formatString = columnEntry.getFormat();
if (StringUtils.isNotBlank(formatString)) {
// 用户自己配置的格式转换
SimpleDateFormat format = new SimpleDateFormat(
formatString);
columnGenerated = new DateColumn(
format.parse(columnValue));
}
else {
// 框架尝试转换
columnGenerated = new DateColumn(
new StringColumn(columnValue)
.asDate());
}
}
break;
case TIMESTAMP:
if ( null == columnValue) {
columnGenerated = null;
} else {
columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000);
// 框架尝试转换
columnGenerated = new DateColumn(
new StringColumn(columnValue)
.asDate());
}
break;
case BINARY:
columnGenerated = new BytesColumn(((ByteBuffer)gRecord.get(columnIndex)).array());
break;
default:
String errorMessage = String.format(
"您配置的列类型暂不支持 : [%s]", columnType);
LOG.error(errorMessage);
throw DataXException
.asDataXException(
StorageReaderErrorCode.NOT_SUPPORT_TYPE,
errorMessage);
}
}
catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s], %s", columnValue, type, e));
}
break;
case TIMESTAMP:
if (null == columnValue) {
columnGenerated = null;
}
else {
columnGenerated = new DateColumn(Long.parseLong(columnValue) * 1000);
}
break;
case BINARY:
columnGenerated = new BytesColumn(((ByteBuffer) gRecord.get(columnIndex)).array());
break;
default:
String errorMessage = String.format(
"您配置的列类型暂不支持 : [%s]", columnType);
LOG.error(errorMessage);
throw DataXException
.asDataXException(
StorageReaderErrorCode.NOT_SUPPORT_TYPE,
errorMessage);
}
record.addColumn(columnGenerated);
} // end for
} // end else
}
catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s], %s", columnValue, type, e));
}
record.addColumn(columnGenerated);
} // end for

recordSender.sendToWriter(record);
}
catch (IllegalArgumentException | IndexOutOfBoundsException iae) {
@@ -830,6 +858,6 @@ private boolean isParquetFile(Path file, FSDataInputStream in)

private enum Type
{
STRING, LONG, BOOLEAN, DOUBLE, DATE, BINARY, TIMESTAMP,
INT, STRING, LONG, BOOLEAN, DOUBLE, DATE, BINARY, TIMESTAMP, DECIMAL,
}
}

0 comments on commit ce8cfbf

Please sign in to comment.