Skip to content

Commit d4e778f

Browse files
committed
add local file import support
1 parent 8b77dd3 commit d4e778f

File tree

23 files changed

+222
-171
lines changed

23 files changed

+222
-171
lines changed

create-ln.sh

+6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ do
1010
ln -s $f /opt/data/tis/libs/plugins/${f##*/}
1111
done ;
1212

13+
cd /opt/misc/tis-plugins-commercial
14+
sh /opt/misc/tis-plugins-commercial/create-ln.sh
15+
16+
cd /opt/misc/tis-sqlserver-plugin
17+
sh /opt/misc/tis-sqlserver-plugin/create-ln.sh
18+
1319
#for tis-scala-compiler-dependencies
1420
#rm -f /opt/data/tis/libs/tis-scala-compiler-dependencies/*
1521
#cd ./tis-scala-compiler-dependencies

datax-config/src/main/java/com/qlangtech/tis/plugin/ds/RunningContext.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,6 @@
2424
**/
2525
public
2626
interface RunningContext {
27-
27+
public String getDbName();
28+
public String getTable();
2829
}

tis-base-test/src/main/java/com/qlangtech/tis/order/dump/task/MockDataSourceFactory.java

+10-12
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,16 @@
2121
import com.google.common.collect.Maps;
2222
import com.qlangtech.tis.common.utils.Assert;
2323
import com.qlangtech.tis.manage.common.TisUTF8;
24-
import com.qlangtech.tis.plugin.ds.*;
24+
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
25+
import com.qlangtech.tis.plugin.ds.DBConfig;
26+
import com.qlangtech.tis.plugin.ds.DataDumpers;
27+
import com.qlangtech.tis.plugin.ds.DataSourceFactory;
28+
import com.qlangtech.tis.plugin.ds.DataType;
29+
import com.qlangtech.tis.plugin.ds.IDataSourceDumper;
30+
import com.qlangtech.tis.plugin.ds.JDBCTypes;
31+
import com.qlangtech.tis.plugin.ds.RunningContext;
32+
import com.qlangtech.tis.plugin.ds.TISTable;
33+
import com.qlangtech.tis.plugin.ds.TableInDB;
2534
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
2635
import org.apache.commons.io.IOUtils;
2736
import org.apache.commons.lang.StringUtils;
@@ -31,7 +40,6 @@
3140
import java.sql.Connection;
3241
import java.sql.DriverManager;
3342
import java.sql.SQLException;
34-
import java.sql.Types;
3543
import java.util.Iterator;
3644
import java.util.List;
3745
import java.util.Map;
@@ -46,7 +54,6 @@ protected Connection getConnection(String jdbcUrl, String username, String passw
4654
return DriverManager.getConnection(jdbcUrl, StringUtils.trimToNull(username), StringUtils.trimToNull(password));
4755
}
4856

49-
5057
@Override
5158
public DBConfig getDbConfig() {
5259
throw new IllegalStateException();
@@ -61,15 +68,6 @@ public void visitFirstConnection(IConnProcessor connProcessor) {
6168
public void refresh() {
6269

6370
}
64-
// @Override
65-
// public void refectTableInDB(TableInDB tabs, Connection conn) throws SQLException {
66-
// throw new UnsupportedOperationException();
67-
// }
68-
69-
// @Override
70-
// public String identityValue() {
71-
// return "mockDs";
72-
// }
7371

7472
/**
7573
* 模拟Employee表的导入

tis-console/src/main/java/com/qlangtech/tis/coredefine/module/action/DataxAction.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1180,8 +1180,8 @@ public void doUpdateDatax(Context context) throws Exception {
11801180

11811181
IAppSource.cleanAppSourcePluginStoreCache(null, dataxName);
11821182
IAppSource.cleanAppSourcePluginStoreCache(this, dataxName);
1183-
SelectedTabExtend.clearTabExtend(null,dataxName);
1184-
SelectedTabExtend.clearTabExtend(this,dataxName);
1183+
SelectedTabExtend.clearTabExtend(null, dataxName);
1184+
SelectedTabExtend.clearTabExtend(this, dataxName);
11851185

11861186
DataXJobSubmit.getPowerJobSubmit().ifPresent((submit) -> {
11871187
submit.saveJob(this, context, old);
@@ -1348,7 +1348,7 @@ public void doGetTableMapper(Context context) {
13481348
}
13491349

13501350
if (!dataxReader.hasMulitTable()) {
1351-
throw new IllegalStateException("reader has not set table at least");
1351+
throw new IllegalStateException("reader (" + dataxReader.getClass().getSimpleName() + ") has not set table at least");
13521352
}
13531353
List<TableAlias> tmapList = Lists.newArrayList();
13541354
for (ISelectedTab selectedTab : dataxReader.getSelectedTabs()) {
@@ -1550,7 +1550,7 @@ public boolean validate(IFieldErrorHandler msgHandler, Context context, String f
15501550
addErrorMessage(context, "请至少选择一个主键列");
15511551
postMCols.validateFaild = true;
15521552
}
1553-
1553+
writerCols.addAll(postMCols.writerCols);
15541554
return !postMCols.validateFaild;
15551555
}
15561556
}))) {

tis-manage-pojo/src/main/java/com/qlangtech/tis/plugin/ds/ColumnMetaData.java

+20
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.List;
2626
import java.util.Map;
2727
import java.util.TreeMap;
28+
import java.util.concurrent.atomic.AtomicInteger;
2829
import java.util.function.Function;
2930
import java.util.stream.Collectors;
3031

@@ -50,6 +51,25 @@ public static CMeta convert(ColumnMetaData c) {
5051
return c.convert();
5152
}
5253

54+
public static List<ColumnMetaData> convert(List<CMeta> cs) {
55+
int[] index = new int[1];
56+
return cs.stream().map((cm) -> {
57+
//int index, String key, DataType type, boolean pk, boolean nullable
58+
return new ColumnMetaData(index[0]++, cm.getName(), cm.getType(), cm.isPk(), cm.isNullable());
59+
60+
}).collect(Collectors.toList());
61+
62+
// ColumnMetaData c = this;
63+
// CMeta cmeta = createCmeta();
64+
// cmeta.setName(c.getName());
65+
// cmeta.setComment(c.getComment());
66+
// cmeta.setPk(c.isPk());
67+
// cmeta.setType(c.getType());
68+
// cmeta.setNullable(c.isNullable());
69+
//
70+
// return c.convert();
71+
}
72+
5373
public static void fillSelectedTabMeta(ISelectedTab tab,
5474
Function<ISelectedTab, Map<String, ColumnMetaData>> tableColsMetaGetter) {
5575
Map<String, ColumnMetaData> colsMeta = tableColsMetaGetter.apply(tab);

tis-manage-pojo/src/main/java/com/qlangtech/tis/plugin/ds/DefaultTab.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.Collections;
66
import java.util.List;
77
import java.util.Optional;
8+
import java.util.stream.Collectors;
89

910
/**
1011
* Licensed to the Apache Software Foundation (ASF) under one
@@ -39,12 +40,14 @@ public DefaultTab(String dataXName, List<CMeta> writerCols) {
3940

4041
@Override
4142
public List<String> getPrimaryKeys() {
42-
throw new UnsupportedOperationException();
43+
return writerCols.stream()
44+
.filter((col) -> col.isPk())
45+
.map((col) -> col.getName()).collect(Collectors.toList());
4346
}
4447

4548
@Override
46-
public List<IColMetaGetter> overwriteCols(IMessageHandler pluginCtx,boolean includeContextParams) {
47-
throw new UnsupportedOperationException();
49+
public List<IColMetaGetter> overwriteCols(IMessageHandler pluginCtx, boolean includeContextParams) {
50+
return writerCols.stream().collect(Collectors.toList());
4851
}
4952

5053
public DefaultTab(String tabName) {

tis-manage-pojo/src/main/java/com/qlangtech/tis/plugin/ds/IdlistElementCreatorFactory.java

-55
This file was deleted.

tis-plugin/src/main/java/com/qlangtech/tis/config/hive/meta/HiveTable.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.util.List;
2222
import java.util.Optional;
23+
import java.util.Properties;
2324

2425
/**
2526
* @author: 百岁([email protected]
@@ -83,25 +84,29 @@ public HiveTable(String name) {
8384
this.name = name;
8485
}
8586

86-
public static class StoredAs {
87+
public static abstract class StoredAs {
8788
public final String inputFormat;
8889
public final String outputFormat;
8990

90-
private final Object serdeInfo;
91+
// private final Object serdeInfo;
9192

9293
/**
9394
* @param inputFormat
9495
* @param outputFormat
95-
* @param serdeInfo org.apache.hadoop.hive.metastore.api.SerdeInfo
96+
* // @param serdeInfo org.apache.hadoop.hive.metastore.api.SerdeInfo
9697
*/
97-
public StoredAs(String inputFormat, String outputFormat, Object serdeInfo) {
98+
public StoredAs(String inputFormat, String outputFormat) {
9899
this.inputFormat = inputFormat;
99100
this.outputFormat = outputFormat;
100-
this.serdeInfo = serdeInfo;
101+
// this.serdeInfo = serdeInfo;
101102
}
102103

103-
public <SerDeInfo> SerDeInfo getSerdeInfo() {
104-
return (SerDeInfo) serdeInfo;
105-
}
104+
public abstract Properties getSerdeProperties(HiveTable table);
105+
106+
public abstract String getSerializationLib();
107+
108+
// public <SerDeInfo> SerDeInfo getSerdeInfo() {
109+
// return (SerDeInfo) serdeInfo;
110+
// }
106111
}
107112
}

tis-plugin/src/main/java/com/qlangtech/tis/datax/AdapterDataxReader.java

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.qlangtech.tis.plugin.ds.ColumnMetaData;
2222
import com.qlangtech.tis.plugin.ds.ISelectedTab;
2323
import com.qlangtech.tis.plugin.ds.JDBCConnection;
24+
import com.qlangtech.tis.plugin.ds.RunningContext;
2425
import com.qlangtech.tis.plugin.ds.TableInDB;
2526
import com.qlangtech.tis.plugin.ds.TableNotFoundException;
2627
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;

tis-plugin/src/main/java/com/qlangtech/tis/datax/SourceColMetaGetter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public SourceColMetaGetter(IDataxReader dataXReader) {
5252

5353
private Map<String, ColumnMetaData> getColMetaDataMap(IDataxReader dataXReader, TableMap tableMapper) {
5454
try {
55-
return ColumnMetaData.toMap(dataXReader.getTableMetadata(false, EntityName.parse(tableMapper.getFrom())));
55+
return ColumnMetaData.toMap(dataXReader.getTableMetadata(false, tableMapper));
5656
} catch (TableNotFoundException e) {
5757
throw new RuntimeException(e);
5858
}

tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/SelectedTab.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,11 @@ public List<IColMetaGetter> overwriteCols(IMessageHandler pluginCtx, boolean inc
113113
if (transformerRules.isPresent()) {
114114
ITransformerBuildInfo transformerBuilder = transformerRules.get().createTransformerBuildInfo((IPluginContext) pluginCtx);
115115

116+
List<OutputParameter> overwriteColsWithContextParams
117+
= transformerBuilder.overwriteColsWithContextParams(this.getCols());
118+
116119
List<OutputParameter> outParams = includeContextParams
117-
? transformerBuilder.overwriteColsWithContextParams(this.getCols())
120+
? overwriteColsWithContextParams
118121
: transformerBuilder.tranformerColsWithoutContextParams();
119122
return outParams.stream().map((param) -> param).collect(Collectors.toList());
120123

tis-plugin/src/main/java/com/qlangtech/tis/plugin/datax/transformer/jdbcprop/JdbcPropertyElementCreatorFactory.java

+1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public void appendExternalJsonProp(IPropertyType propertyType, JSONObject biz) {
8484
// ElementCreatorFactory.super.appendExternalJsonProp(propertyType, biz);
8585
}
8686

87+
8788
protected List<CMeta> getColsCandidate() {
8889
List<CMeta> colsCandidate = SelectedTab.getSelectedCols();
8990
return colsCandidate;

tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/DataSourceFactory.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
**/
6464
@Public
6565
public abstract class DataSourceFactory implements Describable<DataSourceFactory>, Serializable, DBIdentity, DataSourceMeta, Wrapper {
66-
// public static final ZoneId DEFAULT_SERVER_TIME_ZONE = MQListenerFactory.DEFAULT_SERVER_TIME_ZONE; // ZoneId.systemDefault();// ZoneId.of("Asia/Shanghai");
66+
// public static final ZoneId DEFAULT_SERVER_TIME_ZONE = MQListenerFactory.DEFAULT_SERVER_TIME_ZONE; // ZoneId.systemDefault();// ZoneId.of("Asia/Shanghai");
6767
public static final String DS_TYPE_MYSQL = "MySQL";
6868
public static final String DS_TYPE_MYSQL_V8 = DS_TYPE_MYSQL + "-V8";
6969

@@ -258,7 +258,7 @@ protected List<ColumnMetaData> parseTableColMeta(boolean inSink, String jdbcUrl,
258258
pkCols.add(columnName);
259259
}
260260

261-
return wrapColsMeta(inSink, table, columns1, pkCols);
261+
return wrapColsMeta(inSink, table, columns1, pkCols, conn);
262262
} finally {
263263
closeResultSet(columns1);
264264
closeResultSet(primaryKeys);
@@ -267,8 +267,9 @@ protected List<ColumnMetaData> parseTableColMeta(boolean inSink, String jdbcUrl,
267267
// return columns;
268268
}
269269

270-
public List<ColumnMetaData> wrapColsMeta(boolean inSink, EntityName table, ResultSet columns1) throws SQLException, TableNotFoundException {
271-
return wrapColsMeta(inSink, table, columns1, Collections.emptySet());
270+
public List<ColumnMetaData> wrapColsMeta(
271+
boolean inSink, EntityName table, ResultSet columns1, JDBCConnection conn) throws SQLException, TableNotFoundException {
272+
return wrapColsMeta(inSink, table, columns1, Collections.emptySet(), conn);
272273
}
273274

274275
public static final String KEY_COLUMN_NAME = "COLUMN_NAME";
@@ -281,9 +282,13 @@ public List<ColumnMetaData> wrapColsMeta(boolean inSink, EntityName table, Resul
281282
public static final String KEY_DATA_TYPE = "DATA_TYPE";
282283
public static final String KEY_COLUMN_SIZE = "COLUMN_SIZE";
283284

284-
public List<ColumnMetaData> wrapColsMeta(
285-
boolean inSink, EntityName table, ResultSet columns1, Set<String> pkCols) throws SQLException, TableNotFoundException {
286-
return this.wrapColsMeta(inSink, table, columns1, new CreateColumnMeta(pkCols, columns1));
285+
public final List<ColumnMetaData> wrapColsMeta(
286+
boolean inSink, EntityName table, ResultSet columns1, Set<String> pkCols, JDBCConnection conn) throws SQLException, TableNotFoundException {
287+
return this.wrapColsMeta(inSink, table, columns1, createColumnMetaBuilder(table, columns1, pkCols, conn));
288+
}
289+
290+
protected CreateColumnMeta createColumnMetaBuilder(EntityName table, ResultSet columns1, Set<String> pkCols, JDBCConnection conn) {
291+
return new CreateColumnMeta(pkCols, columns1);
287292
}
288293

289294
public List<ColumnMetaData> wrapColsMeta(

tis-plugin/src/main/java/com/qlangtech/tis/plugin/ds/DataSourceMeta.java

+4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020

2121
import com.google.common.collect.Maps;
22+
import com.qlangtech.tis.datax.IDataxProcessor.TableMap;
2223
import com.qlangtech.tis.extension.Describable;
2324
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
2425

@@ -52,6 +53,9 @@ default TableInDB getTablesInDB() {
5253
throw new UnsupportedOperationException();
5354
}
5455

56+
default List<ColumnMetaData> getTableMetadata(boolean inSink, TableMap tableMapper) throws TableNotFoundException {
57+
return getTableMetadata(inSink, EntityName.parse(tableMapper.getFrom()));
58+
}
5559

5660
/**
5761
* Get table column metaData list

0 commit comments

Comments
 (0)