Skip to content

Commit

Permalink
[update][plugin][hdfsreader] Refactor: extract Parquet handling logic…
Browse files Browse the repository at this point in the history
… into separate class
  • Loading branch information
wgzhao committed Oct 22, 2024
1 parent 87bddc6 commit 738fc23
Show file tree
Hide file tree
Showing 3 changed files with 326 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import com.alibaba.fastjson2.JSONObject;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.wgzhao.addax.common.base.Key;
import com.wgzhao.addax.common.element.BoolColumn;
import com.wgzhao.addax.common.element.BytesColumn;
import com.wgzhao.addax.common.element.Column;
import com.wgzhao.addax.common.element.ColumnEntry;
Expand All @@ -41,8 +38,6 @@
import com.wgzhao.addax.common.plugin.TaskPluginCollector;
import com.wgzhao.addax.common.util.Configuration;
import com.wgzhao.addax.storage.reader.StorageReaderUtil;
import org.apache.avro.Conversions;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -66,35 +61,24 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.orc.TypeDescription;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.wgzhao.addax.common.base.Key.COLUMN;
import static com.wgzhao.addax.common.base.Key.NULL_FORMAT;
Expand All @@ -111,11 +95,6 @@ public class DFSUtil
{
private static final Logger LOG = LoggerFactory.getLogger(DFSUtil.class);

// the offset of julian, 2440588 is 1970/1/1
private static final int JULIAN_EPOCH_OFFSET_DAYS = 2440588;
private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1);

private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
private final org.apache.hadoop.conf.Configuration hadoopConf;
private final boolean haveKerberos;
Expand Down Expand Up @@ -482,177 +461,12 @@ public void parquetFileStartRead(String sourceParquetFilePath, Configuration rea
RecordSender recordSender, TaskPluginCollector taskPluginCollector)
{
LOG.info("Begin to read the parquet-file [{}].", sourceParquetFilePath);
List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
String nullFormat = readerSliceConfig.getString(NULL_FORMAT);
List<ColumnEntry> column = StorageReaderUtil.getListColumnEntry(readerSliceConfig, COLUMN);
Path parquetFilePath = new Path(sourceParquetFilePath);
MyParquetReader myParquetReader = new MyParquetReader(hadoopConf, parquetFilePath, nullFormat, column);

hadoopConf.set("parquet.avro.readInt96AsFixed", "true");
JobConf conf = new JobConf(hadoopConf);

GenericData decimalSupport = new GenericData();
decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), parquetFilePath)
.withConf(conf)
.build()) {
Group group = reader.read();
MessageType schema = ParquetFileReader.open(HadoopInputFile.fromPath(parquetFilePath, hadoopConf)).getFileMetaData().getSchema();

if (null == column || column.isEmpty()) {

List<org.apache.parquet.schema.Type> fields = schema.getFields();
column = new ArrayList<>(fields.size());

// 用户没有填写具体的字段信息,需要从parquet文件构建
for (int i = 0; i < schema.getFields().size(); i++) {
ColumnEntry columnEntry = new ColumnEntry();
columnEntry.setIndex(i);
columnEntry.setType(getJavaType(fields.get(i)).name());
column.add(columnEntry);
}
}
while (group != null) {
transportParquetRecord(column, group, schema.getFields(), recordSender, taskPluginCollector, nullFormat);
group = reader.read();
}
}
catch (IOException e) {
String message = String.format("IO exception occurred while reading the parquet-file [%s]", sourceParquetFilePath);
LOG.error(message);
throw AddaxException.asAddaxException(IO_ERROR, message);
}
}

private static JavaType getJavaType(Type field)
{
if (field.isPrimitive()) {
switch (field.asPrimitiveType().getPrimitiveTypeName()) {
case INT32:
return JavaType.INT;
case INT64:
return JavaType.LONG;
case INT96:
return JavaType.TIMESTAMP;
case FLOAT:
return JavaType.FLOAT;
case DOUBLE:
return JavaType.DOUBLE;
case BOOLEAN:
return JavaType.BOOLEAN;
case FIXED_LEN_BYTE_ARRAY:
return JavaType.BINARY;
default:
return JavaType.STRING; //Binary as string
}
}
else {
return JavaType.STRING;
}
}

private void transportParquetRecord(List<ColumnEntry> columnConfigs, Group gRecord, List<Type> fields, RecordSender recordSender,
TaskPluginCollector taskPluginCollector, String nullFormat)
{
Record record = recordSender.createRecord();
Column columnGenerated;
int scale = 10;
try {
for (ColumnEntry columnEntry : columnConfigs) {
String columnType = columnEntry.getType();
Integer columnIndex = columnEntry.getIndex();
String columnConst = columnEntry.getValue();
String columnValue;
if (columnConst != null) {
record.addColumn(new StringColumn(columnConst));
continue;
}

if (fields.get(columnIndex).getRepetition() == Type.Repetition.OPTIONAL
&& gRecord.getFieldRepetitionCount(columnIndex)== 0) {
record.addColumn(new StringColumn(nullFormat));
continue;
}

JavaType type = JavaType.valueOf(columnType.toUpperCase());
try {
switch (type) {
case STRING:
columnGenerated = new StringColumn(gRecord.getString(columnIndex, 0));
break;
case INT:
columnGenerated = new LongColumn(gRecord.getInteger(columnIndex, 0));
break;
case LONG:
columnGenerated = new LongColumn(gRecord.getLong(columnIndex, 0));
break;
case FLOAT:
columnGenerated = new DoubleColumn(gRecord.getFloat(columnIndex, 0));
break;
case DOUBLE:
columnGenerated = new DoubleColumn(gRecord.getDouble(columnIndex, 0));
break;
case DECIMAL:
// get decimal value
columnValue = gRecord.getString(columnIndex, 0);
if (null == columnValue) {
columnGenerated = new DoubleColumn((Double) null);
}
else {
columnGenerated = new DoubleColumn(new BigDecimal(columnValue).setScale(scale, RoundingMode.HALF_UP));
}
break;
case BOOLEAN:
columnGenerated = new BoolColumn(gRecord.getBoolean(columnIndex, 0));
break;
case DATE:
columnValue = gRecord.getString(columnIndex, 0);
if (columnValue == null) {
columnGenerated = new DateColumn((Date) null);
}
else {
String formatString = columnEntry.getFormat();
if (StringUtils.isNotBlank(formatString)) {
// 用户自己配置的格式转换
SimpleDateFormat format = new SimpleDateFormat(formatString);
columnGenerated = new DateColumn(format.parse(columnValue));
}
else {
// 框架尝试转换
columnGenerated = new DateColumn(new StringColumn(columnValue).asDate());
}
}
break;
case TIMESTAMP:
Binary binaryTs = gRecord.getInt96(columnIndex, 0);
columnGenerated = new DateColumn(new Date(getTimestampMills(binaryTs)));
break;
case BINARY:
columnGenerated = new BytesColumn(gRecord.getBinary(columnIndex, 0).getBytes());
break;
default:
// try to convert it to string
LOG.debug("try to convert column type {} to String, ", columnType);
columnGenerated = new StringColumn(gRecord.getString(columnIndex, 0));
}
}
catch (Exception e) {
throw new IllegalArgumentException(String.format(
"Can not convert column type %s to %s: %s", columnType, type, e));
}
record.addColumn(columnGenerated);
} // end for

recordSender.sendToWriter(record);
}
catch (IllegalArgumentException | IndexOutOfBoundsException iae) {
taskPluginCollector.collectDirtyRecord(record, iae.getMessage());
}
catch (Exception e) {
if (e instanceof AddaxException) {
throw (AddaxException) e;
}
// cast failed means dirty data, including number format, date format, etc.
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
}
myParquetReader.reader(recordSender, taskPluginCollector);
}

private TypeDescription getOrcSchema(String filePath)
Expand Down Expand Up @@ -847,55 +661,4 @@ private boolean isParquetFile(Path file)
}
return false;
}

/**
* Returns GMT's timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of day nanos).
*
* @param timestampBinary INT96 parquet timestamp
* @return timestamp in millis, GMT timezone
*/
public static long getTimestampMills(Binary timestampBinary)
{
if (timestampBinary.length() != 12) {
return 0;
}
byte[] bytes = timestampBinary.getBytes();

return getTimestampMills(bytes);
}

public static long getTimestampMills(byte[] bytes)
{
assert bytes.length == 12;
// little endian encoding - need to invert byte order
long timeOfDayNanos = Longs.fromBytes(bytes[7], bytes[6], bytes[5], bytes[4], bytes[3], bytes[2], bytes[1], bytes[0]);
int julianDay = Ints.fromBytes(bytes[11], bytes[10], bytes[9], bytes[8]);

return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND);
}

private static long julianDayToMillis(int julianDay)
{
return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY;
}

private enum JavaType
{
TINYINT,
SMALLINT,
INT,
INTEGER,
BIGINT,
FLOAT,
DOUBLE,
TIMESTAMP,
DATE,
DECIMAL,
STRING,
VARCHAR,
CHAR,
LONG,
BOOLEAN,
BINARY
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.wgzhao.addax.plugin.reader.hdfsreader;

public enum JavaType {
TINYINT,
SMALLINT,
INT,
INTEGER,
BIGINT,
FLOAT,
DOUBLE,
TIMESTAMP,
DATE,
DECIMAL,
STRING,
VARCHAR,
CHAR,
LONG,
BOOLEAN,
BINARY
}
Loading

0 comments on commit 738fc23

Please sign in to comment.