Skip to content

Commit

Permalink
[improve][plugin][hdfswriter] enhance the output for geater clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
wgzhao committed Sep 6, 2024
1 parent 07aa414 commit e093389
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@
import com.wgzhao.addax.common.exception.AddaxException;
import com.wgzhao.addax.common.util.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,11 +45,13 @@
import java.util.Objects;
import java.util.Set;

import static com.wgzhao.addax.common.base.Key.HAVE_KERBEROS;
import static com.wgzhao.addax.common.base.Key.KERBEROS_KEYTAB_FILE_PATH;
import static com.wgzhao.addax.common.base.Key.KERBEROS_PRINCIPAL;

public class HdfsHelper
{
private static final Logger LOG = LoggerFactory.getLogger(HdfsHelper.class);
private static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
private static final String HDFS_DEFAULT_FS_KEY = "fs.defaultFS";

protected FileSystem fileSystem = null;
protected JobConf conf = null;
Expand All @@ -61,14 +69,14 @@ protected void getFileSystem(Configuration taskConfig)
hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
}
}
hadoopConf.set(HDFS_DEFAULT_FS_KEY, defaultFS);
hadoopConf.set("fs.defaultFS", defaultFS);

//是否有Kerberos认证
boolean haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);
boolean haveKerberos = taskConfig.getBool(HAVE_KERBEROS, false);
if (haveKerberos) {
String kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
String kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
String kerberosKeytabFilePath = taskConfig.getString(KERBEROS_KEYTAB_FILE_PATH);
String kerberosPrincipal = taskConfig.getString(KERBEROS_PRINCIPAL);
hadoopConf.set("hadoop.security.authentication", "kerberos");
// fix Failed to specify server's Kerberos principal name
if (Objects.equals(hadoopConf.get("dfs.namenode.kerberos.principal", ""), "")) {
// get REALM
Expand Down Expand Up @@ -172,7 +180,7 @@ public void deleteFilesFromDir(Path dir, boolean skipTrash)
if (skipTrash) {
while (files.hasNext()) {
final LocatedFileStatus next = files.next();
LOG.info("Delete file [{}]", next.getPath());
LOG.info("Delete the file [{}]", next.getPath());
fileSystem.delete(next.getPath(), false);
}
}
Expand All @@ -183,7 +191,7 @@ public void deleteFilesFromDir(Path dir, boolean skipTrash)
final Trash trash = new Trash(hadoopConf);
while (files.hasNext()) {
final LocatedFileStatus next = files.next();
LOG.info("Move file [{}] to Trash", next.getPath());
LOG.info("Move the file [{}] to Trash", next.getPath());
trash.moveToTrash(next.getPath());
}
}
Expand Down Expand Up @@ -223,15 +231,15 @@ public void moveFilesToDest(Path sourceDir, Path targetDir)
final FileStatus[] fileStatuses = fileSystem.listStatus(sourceDir);
for (FileStatus file : fileStatuses) {
if (file.isFile() && file.getLen() > 0) {
LOG.info("Begin to move file from [{}] to [{}].", file.getPath(), targetDir.getName());
LOG.info("Begin to move the file [{}] to [{}].", file.getPath(), targetDir);
fileSystem.rename(file.getPath(), new Path(targetDir, file.getPath().getName()));
}
}
}
catch (IOException e) {
throw AddaxException.asAddaxException(HdfsWriterErrorCode.IO_ERROR, e);
}
LOG.info("Finish move file(s).");
LOG.info("Finish moving file(s).");
}

//关闭FileSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static class Job

private static final String SKIP_TRASH = "skipTrash";

// 写入文件的临时目录,完成写入后,该目录需要删除
// The temporary directory where the file(s) are written will be deleted after the write operation is complete.
private String tmpStorePath;
private Configuration writerSliceConfig = null;
private String path;
Expand Down Expand Up @@ -116,8 +116,8 @@ private void validateParameter()
if (eachColumnConf.getString(Key.TYPE).toUpperCase().startsWith("DECIMAL")) {
String type = eachColumnConf.getString(Key.TYPE);
eachColumnConf.set(Key.TYPE, "decimal");
eachColumnConf.set(Key.PRECISION, getDecimalPrecision(type));
eachColumnConf.set(Key.SCALE, getDecimalScale(type));
eachColumnConf.set(Key.PRECISION, (Object) getDecimalPrecision(type));
eachColumnConf.set(Key.SCALE, (Object) getDecimalScale(type));
columns.set(i, eachColumnConf);
rewriteFlag = true;
}
Expand Down Expand Up @@ -200,28 +200,26 @@ private void validateParameter()
@Override
public void prepare()
{
//临时存放路径

this.tmpStorePath = buildTmpFilePath(path);

//若路径已经存在,检查path是否是目录
// Verify whether the path is a directory if it exists.
if (hdfsHelper.isPathExists(path)) {
if (!hdfsHelper.isPathDir(path)) {
throw AddaxException.asAddaxException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("The item path you configured [%s] is exists ,but it is not directory", path));
}

//根据writeMode对目录下文件进行处理
// 写入之前,当前目录下已有的文件,根据writeMode判断是否覆盖
Path[] existFilePaths = hdfsHelper.hdfsDirList(path);

boolean isExistFile = existFilePaths.length > 0;
if ("append".equals(writeMode)) {
LOG.info("The current writeMode is append, so no cleanup is performed before writing. " +
"Files with the prefix [{}] are written under the [{}] directory.", fileName, path);
LOG.info("The current write mode is set to 'append', no cleanup is performed before writing. " +
"Files with the prefix [{}] are written in the [{}] directory.", fileName, path);
}
else if ("nonConflict".equals(writeMode) && isExistFile) {
throw AddaxException.asAddaxException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("The current writeMode is nonConflict, but the directory [%s] is not empty, it includes sub-path(s): [%s]",
String.format("The current writeMode is set to 'nonConflict', but the directory [%s] is not empty, it includes the sub-path(s): [%s]",
path, String.join(",", Arrays.stream(existFilePaths).map(Path::getName).collect(Collectors.toSet()))));
}
}
Expand Down Expand Up @@ -267,14 +265,13 @@ public List<Configuration> split(int mandatoryNumber)
for (int i = 0; i < mandatoryNumber; i++) {
// handle same file name
Configuration splitTaskConfig = this.writerSliceConfig.clone();
tmpFullFileName = String.format("%s/%s_%s.%s", tmpStorePath, filePrefix, FileHelper.generateFileMiddleName(), fileType);
endFullFileName = String.format("%s/%s_%s.%s", path, filePrefix, FileHelper.generateFileMiddleName(), fileType);

// 如果文件已经存在,则重新生成文件名
while (allFiles.contains(endFullFileName)) {
do {
tmpFullFileName = String.format("%s/%s_%s.%s", tmpStorePath, filePrefix, FileHelper.generateFileMiddleName(), fileType);
endFullFileName = String.format("%s/%s_%s.%s", path, filePrefix, FileHelper.generateFileMiddleName(), fileType);
}
while (allFiles.contains(endFullFileName));
allFiles.add(endFullFileName);

splitTaskConfig.set(Key.FILE_NAME, tmpFullFileName);
Expand Down Expand Up @@ -373,7 +370,6 @@ public void init()
{

this.writerSliceConfig = this.getPluginJobConf();

}

@Override
Expand Down Expand Up @@ -405,19 +401,6 @@ public void startWrite(RecordReceiver lineReceiver)
String fileName = this.writerSliceConfig.getString(Key.FILE_NAME);
LOG.info("Begin to write file : [{}]", fileName);
hdfsHelper.write(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector());
// if ("TEXT".equals(fileType)) {
// //写TEXT FILE
// hdfsHelper.textFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector());
// }
// else if ("ORC".equals(fileType)) {
// //写ORC FILE
// hdfsHelper.writeFS(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector());
// }
// else if ("PARQUET".equals(fileType)) {
// //写Parquet FILE
// hdfsHelper.parquetFileStartWrite(lineReceiver, writerSliceConfig, fileName, getTaskPluginCollector());
// }

LOG.info("Finish write");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ public ParquetWriter(Configuration conf)
}

/*
* 写Parquet file类型文件
* 一个parquet文件的schema类似如下:
* the schema of a parquet file is as follows:
* {
* "type": "record",
* "name": "testFile",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ public static MutablePair<Text, Boolean> transportOneRecord(
Record record, char fieldDelimiter, List<Configuration> columnsConfiguration, TaskPluginCollector taskPluginCollector)
{
MutablePair<List<Object>, Boolean> transportResultList = transportOneRecord(record, columnsConfiguration, taskPluginCollector);
//保存<转换后的数据,是否是脏数据>
MutablePair<Text, Boolean> transportResult = new MutablePair<>();
transportResult.setRight(false);
Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter));
Expand All @@ -130,7 +129,6 @@ public static MutablePair<List<Object>, Boolean> transportOneRecord(
String rowData = column.getRawData().toString();
SupportHiveDataType columnType = SupportHiveDataType.valueOf(
columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase());
//根据writer端类型配置做类型转换
try {
switch (columnType) {
case TINYINT:
Expand Down

0 comments on commit e093389

Please sign in to comment.