diff --git a/plugin/reader/hivereader/src/main/java/com/wgzhao/addax/plugin/reader/hivereader/HiveReader.java b/plugin/reader/hivereader/src/main/java/com/wgzhao/addax/plugin/reader/hivereader/HiveReader.java index ac4494d77..76427e8a6 100644 --- a/plugin/reader/hivereader/src/main/java/com/wgzhao/addax/plugin/reader/hivereader/HiveReader.java +++ b/plugin/reader/hivereader/src/main/java/com/wgzhao/addax/plugin/reader/hivereader/HiveReader.java @@ -19,6 +19,9 @@ package com.wgzhao.addax.plugin.reader.hivereader; +import com.wgzhao.addax.common.element.Column; +import com.wgzhao.addax.common.element.DoubleColumn; +import com.wgzhao.addax.common.element.TimestampColumn; import com.wgzhao.addax.common.exception.AddaxException; import com.wgzhao.addax.common.plugin.RecordSender; import com.wgzhao.addax.common.spi.Reader; @@ -30,6 +33,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.UnsupportedEncodingException; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Types; import java.util.List; import static com.wgzhao.addax.common.base.Constant.DEFAULT_FETCH_SIZE; @@ -39,22 +47,19 @@ import static com.wgzhao.addax.common.base.Key.KERBEROS_PRINCIPAL; public class HiveReader - extends Reader -{ + extends Reader { private static final DataBaseType DATABASE_TYPE = DataBaseType.Hive; public static class Job - extends Reader.Job - { + extends Reader.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration originalConfig = null; private CommonRdbmsReader.Job commonRdbmsReaderJob; @Override - public void init() - { + public void init() { this.originalConfig = getPluginJobConf(); boolean haveKerberos = originalConfig.getBool(HAVE_KERBEROS, false); @@ -71,37 +76,31 @@ public void init() } @Override - public void preCheck() - { + public void preCheck() { this.commonRdbmsReaderJob.preCheck(originalConfig, DATABASE_TYPE); } @Override - public List split(int adviceNumber) - { + public List split(int adviceNumber) { return this.commonRdbmsReaderJob.split(originalConfig, adviceNumber); } @Override - public void post() - { + public void post() { this.commonRdbmsReaderJob.post(originalConfig); } @Override - public void destroy() - { + public void destroy() { this.commonRdbmsReaderJob.destroy(originalConfig); } - private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf) - { + private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath, org.apache.hadoop.conf.Configuration hadoopConf) { if (StringUtils.isNotBlank(kerberosPrincipal) && StringUtils.isNotBlank(kerberosKeytabFilePath)) { UserGroupInformation.setConfiguration(hadoopConf); try { UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath); - } - catch (Exception e) { + } catch (Exception e) { String message = String.format("Auth failure with kerberos, Please check " + "kerberosKeytabFilePath[%s] and kerberosPrincipal[%s]", kerberosKeytabFilePath, kerberosPrincipal); @@ -112,37 +111,45 @@ private void kerberosAuthentication(String kerberosPrincipal, String kerberosKey } public static class Task - extends Reader.Task - { + extends Reader.Task { private Configuration readerSliceConfig; private CommonRdbmsReader.Task commonRdbmsReaderTask; @Override - public void init() - { + public void init() { this.readerSliceConfig = getPluginJobConf(); - this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(), getTaskId()); + this.commonRdbmsReaderTask = new CommonRdbmsReader.Task(DATABASE_TYPE, getTaskGroupId(), getTaskId()) { + + @Override + protected Column createColumn(ResultSet rs, ResultSetMetaData metaData, int i) + throws SQLException, UnsupportedEncodingException { + if (metaData.getColumnType(i) == Types.TIMESTAMP ) { + // hive HiveBaseResultSet#getTimestamp(String columnName, Calendar cal) not support + return new TimestampColumn(rs.getTimestamp(i)); + } + return super.createColumn(rs, metaData, i); + } + + }; + this.commonRdbmsReaderTask.init(this.readerSliceConfig); } @Override - public void startRead(RecordSender recordSender) - { + public void startRead(RecordSender recordSender) { int fetchSize = this.readerSliceConfig.getInt(FETCH_SIZE, DEFAULT_FETCH_SIZE); this.commonRdbmsReaderTask.startRead(readerSliceConfig, recordSender, getTaskPluginCollector(), fetchSize); } @Override - public void post() - { + public void post() { this.commonRdbmsReaderTask.post(readerSliceConfig); } @Override - public void destroy() - { + public void destroy() { this.commonRdbmsReaderTask.destroy(readerSliceConfig); } }