Skip to content

Commit

Permalink
fix:校验功能
Browse files Browse the repository at this point in the history
  • Loading branch information
zhp8341 committed Jun 25, 2022
1 parent bef0fcb commit 317c0ba
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 87 deletions.
30 changes: 30 additions & 0 deletions flink-streaming-core/src/cdc-test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
CREATE TABLE test (
id INT NOT NULL,
name STRING,
age INT
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.79.128',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'test',
'scan.incremental.snapshot.enabled'='false'
);
CREATE TABLE test_sink (
id INT NOT NULL,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.79.128:3306/mydb?characterEncoding=UTF-8',
'table-name' = 'test_sink',
'username' = 'root',
'password' = '123456'
);

select * from test;

insert into test_sink select * from test;
46 changes: 46 additions & 0 deletions flink-streaming-core/src/hive-test.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@

BEGIN STATEMENT SET;

SET 'table.local-time-zone' = 'Asia/Shanghai';

CREATE CATALOG testmyhive WITH (
'type' = 'hive',
'default-database' = 'zhp',
'hive-conf-dir' = '/Users/huipeizhu/hive-conf'
);

USE CATALOG testmyhive;

drop table IF EXISTS item_test;

drop table IF EXISTS hive_flink_table;

create table item_test (
itemId BIGINT,
price BIGINT,
proctime AS PROCTIME ()
)with (
'connector' = 'kafka',
'topic' = 'flink-catalog-v1',
'properties.bootstrap.servers'='127.0.0.1:9092',
'properties.group.id'='test-1',
'format'='json',
'scan.startup.mode' = 'earliest-offset'
);


SET 'table.sql-dialect'='hive';

CREATE TABLE hive_flink_table (
itemId BIGINT,
price BIGINT,
ups string
) TBLPROPERTIES (
'sink.rolling-policy.rollover-interval'='1min',
'sink.partition-commit.trigger'='process-time',
'sink.partition-commit.policy.kind'='metastore,success-file'
);

SET 'table.sql-dialect'=default;

insert into hive_flink_table select itemId,price, 'XXXXaaa' as ups from item_test;
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,14 @@ public class ValidationConstants {

public static final String MESSAGE_010 = "必须包含 insert or insert overwrite 语句";

public static final String MESSAGE_011 = "暂时不支持直接使用select语句,请使用 insert into select 语法";
public static final String MESSAGE_011 = "暂时不支持直接使用select语句,请使用 insert into select 语法 或者使用 print 连接器打印结果";

public static final String TABLE_SQL_DIALECT_1= "table.sql-dialect";


public static final String INSERT= "INSERT";

public static final String SELECT = "SELECT";
public static final String SPLIT_1 = "'";
public static final String SPACE = "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import com.flink.streaming.common.model.SqlCommandCall;
import com.flink.streaming.common.sql.SqlFileParser;
import com.flink.streaming.sql.util.ValidationConstants;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.config.Lex;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.commons.collections.CollectionUtils;
Expand All @@ -25,10 +26,6 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.planner.calcite.CalciteConfig;
import org.apache.flink.table.planner.delegation.FlinkSqlParserFactories;
import org.apache.flink.table.planner.parse.CalciteParser;
Expand All @@ -49,95 +46,47 @@ public static void explainStmt(List<String> stmtList) {
.build();

TableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
TableConfig config = tEnv.getConfig();
String sql=null;

List<Operation> modifyOperationList=new ArrayList<>();
Parser parser = ((TableEnvironmentInternal) tEnv).getParser();
Operation operation=null;
String explainStmt=null;
try {
for (String stmt : stmtList) {
explainStmt=stmt;
operation= parser.parse(stmt).get(0);
//TODO hive 暂时不校验
if (operation.getClass().getSimpleName().
equalsIgnoreCase("CreateCatalogOperation")){
throw new RuntimeException("暂不支持SQL批任务校验");
}
log.info("operation={}", operation.getClass().getSimpleName());
switch (operation.getClass().getSimpleName()) {
//显示
case "ShowTablesOperation":
case "ShowCatalogsOperation":
case "ShowCreateTableOperation":
case "ShowCurrentCatalogOperation":
case "ShowCurrentDatabaseOperation":
case "ShowDatabasesOperation":
case "ShowFunctionsOperation":
case "ShowModulesOperation":
case "ShowPartitionsOperation":
case "ShowViewsOperation":
case "ExplainOperation":
case "DescribeTableOperation":
tEnv.executeSql(stmt).print();
break;
//set
case "SetOperation":
SetOperation setOperation = (SetOperation) operation;
String key = setOperation.getKey().get();
String value = setOperation.getValue().get();
Configuration configuration = tEnv.getConfig().getConfiguration();
log.info("#############setConfiguration#############\n key={} value={}",
key, value);
configuration.setString(key, value);
break;
boolean isInsertSql = false;

case "BeginStatementSetOperation":
System.out.println("####stmt= " + stmt);
log.info("####stmt={}", stmt);
break;
case "DropTableOperation":
case "DropCatalogFunctionOperation":
case "DropTempSystemFunctionOperation":
case "DropCatalogOperation":
case "DropDatabaseOperation":
case "DropViewOperation":
case "CreateTableOperation":
case "CreateViewOperation":
case "CreateDatabaseOperation":
case "CreateTableASOperation":
case "CreateCatalogFunctionOperation":
case "CreateTempSystemFunctionOperation":
case "AlterTableOperation":
case "AlterViewOperation":
case "AlterDatabaseOperation":
case "AlterCatalogFunctionOperation":
case "UseCatalogOperation":
case "UseDatabaseOperation":
case "LoadModuleOperation":
case "UnloadModuleOperation":
case "NopOperation":
((TableEnvironmentInternal) tEnv)
.executeInternal(parser.parse(stmt).get(0));
break;
case "CatalogSinkModifyOperation":
modifyOperationList.add(operation);
break;
default:
throw new RuntimeException("不支持该语法 sql=" + stmt);
}
}
if (modifyOperationList.size() > 0) {
((TableEnvironmentInternal) tEnv).explainInternal(modifyOperationList);
boolean isSelectSql = false;

try {
for (String stmt : stmtList) {
sql = stmt.trim();
Boolean setSuccess = setSqlDialect(sql, config);
CalciteParser parser = new CalciteParser(getSqlParserConfig(config));
if (setSuccess) {
log.info("set 成功 sql={}",sql);
continue;
}
SqlNode sqlNode=parser.parse(sql);
if (ValidationConstants.INSERT.equalsIgnoreCase(sqlNode.getKind().name())) {
isInsertSql = true;
}
if (ValidationConstants.SELECT.equalsIgnoreCase(sqlNode.getKind().name())) {
isSelectSql = true;
}
log.info("sql:{} 校验通过",sql);
}
}catch (Exception e) {
log.error("语法异常: sql={} 原因是: ", explainStmt, e);
throw new RuntimeException("语法异常 sql=" + explainStmt + " 原因: " + e.getMessage());
log.error("语法错误: {} 原因是: ", sql, e);
throw new RuntimeException("语法错误:" + sql + " 原因: " + e.getMessage());
}

if (!isInsertSql) {
throw new RuntimeException(ValidationConstants.MESSAGE_010);
}
if (isSelectSql) {
throw new RuntimeException(ValidationConstants.MESSAGE_011);
}
log.info("全部语法校验成功");

}



/**
* @author zhuhuipei
* @date 2021/3/27
Expand Down Expand Up @@ -259,4 +208,37 @@ public static List<String> toSqlList(String sql) {
}


/**
*设置方言
* @Param:[sql, tableConfig]
* @return: java.lang.Boolean
* @Author: zhuhuipei
* @date 2022/6/24
*/
private static Boolean setSqlDialect(String sql,TableConfig tableConfig){
final Matcher matcher = SqlCommand.SET.getPattern().matcher(sql);
if (matcher.matches()) {
final String[] groups = new String[matcher.groupCount()];
for (int i = 0; i < groups.length; i++) {
groups[i] = matcher.group(i + 1);
}
String key=groups[1].replace(ValidationConstants.SPLIT_1,ValidationConstants.SPACE).trim();
String val=groups[2];
if (ValidationConstants.TABLE_SQL_DIALECT_1.equalsIgnoreCase(key)){
if ( SqlDialect.HIVE.name().equalsIgnoreCase(
val.replace(ValidationConstants.SPLIT_1,ValidationConstants.SPACE).trim())) {
tableConfig.setSqlDialect(SqlDialect.HIVE);
} else {
tableConfig.setSqlDialect(SqlDialect.DEFAULT);
}
}else {
Configuration configuration = tableConfig.getConfiguration();
configuration.setString(key, val);
}
return true;
}
return false;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
* @time 22:30
*/
public class TestSqlValidation {
private static String test_sql_file = "/Users/edy/git/flink-streaming-platform-web/flink-streaming-core/src/test.sql";
private static String test_sql_file = "/Users/edy/git/flink-streaming-platform-web/flink-streaming-core/src/hive-test.sql";

@Test
public void checkSql() throws IOException {
List<String> list = Files.readAllLines(Paths.get(test_sql_file));
List<String> sqlList = SqlFileParser.parserSql(list);
SqlValidation.explainStmt(sqlList);
//SqlValidation.preCheckSql(list);
}
}

0 comments on commit 317c0ba

Please sign in to comment.